跳转至

3.4、Kafka

功能模块如图所示:

积木功能

连接Kafka

功能:这个节点允许定义与Kafka集群建立连接所需的信息。

积木功能

配置界面功能说明:

设置:

  • 连接地址:输入建立到Kafka集群的连接的地址。该地址必须以host1:port1,host2:port2,...的形式输入。
  • 连接验证超时(毫秒):等待Kafka集群响应的时间。如果在给定的时间窗口内没有收到响应,则连接无效。
  • 高级设置:可以指定额外的连接相关选项,例如超时和Kerberos身份验证等。

Kafka生产者

功能:这个节点可以向Kafka发送消息。

积木功能

配置界面功能说明:

设置:

  • 客户端id:发出请求时传递给Kafka集群的id字符串。该字符串可以作为逻辑应用程序名包含在服务器端请求日志记录中,从而更容易跟踪ip/port以外的请求源。
  • 主题:生产者将消息发送到的主题列表。主题列表必须以topic1,topic2,... .的形式输入。
  • 列消息:一个字符串或JSON列,包含必须发送到Kafka集群的消息。
  • 发送类型:定义如何发送消息。如果选择异步或同步,如果任何消息无法发送到Kafka,节点将抛出异常。在Fire和forget的情况下,消息是否已成功发送是未知的。

事务设置:

  • 使用事务:指定消息是否必须以事务方式发送。
  • 事务id:消息生产者的事务id。这个id必须是唯一的。
  • 事务提交选项:允许指定每个事务发送的消息数。

高级设置:

  • 高级设置:允许指定额外的生产者相关的配置,例如,请求超时和重试次数。

Kafka消费者

功能:这个节点可以使用Kafka集群中给定主题的消息,并将它们存储在一个表中。

积木功能

配置界面功能说明:

设置:

  • 客户端id:发出请求时传递给Kafka集群的id字符串。该字符串可以作为逻辑应用程序名包含在服务器端请求日志记录中,从而更容易跟踪ip/port以外的请求源。
  • 组id:使用者所属的组的唯一标识。
  • 主题:使用者订阅的主题模式或列表。模式必须是这里定义的正则表达式。主题列表必须以topic1,topic2,... .的形式输入。
  • 主题模式:如果选择此选项,则主题的值将被视为模式而不是列表。
  • 附加消息信息列:如果选择此选项,将向输出表中添加其他列,其中包含关于消息/行的详细信息。更准确地说,将添加以下列:主题、分区、偏移量、消息创建日期。
  • 将消息转换为JSON:如果选择此选项,则使用的消息将从string转换为JSON。
  • 每次轮询的最大消息数:在单个请求中轮询的最大消息数。
  • 轮询超时时间(ms):用于等待轮询请求中的消息的时间(以毫秒为单位)。
  • 终止条件:确定如何停止此节点的执行。
    • 当使用的消息数量超过时停止:一旦没有更多的消息可用,该节点将停止,但最迟在使用了指定数量的消息之后停止。
    • 当消息时间戳超时停止:使用在选定日期和时间之前创建的所有消息。如果选择的日期和时间是将来的,一旦轮询请求没有返回任何消息,执行就会停止。
  • 高级设置:可以指定额外的消费者相关配置,例如,轮询间隔和会话超时。