跳转至

kafka

使用sink kafka将日志数据发送至下游Kafka。

Example

sink:
  type: kafka
  brokers: ["127.0.0.1:6400"]
  topic: "log-${fields.topic}"
sink:
  type: kafka
  brokers: ["127.0.0.1:6400"]
  topic: "demo"
  sasl:
    type: scram
    username: ***
    password: ***
    algorithm: sha512

支持的Kafka版本

该kafka sink使用的是segmentio/kafka-go库。当前Loggie使用的库版本为v0.4.39,对应版本测试支持的Kafka版本为:0.10.1.0 - 2.7.1

brokers

字段 类型 是否必填 默认值 含义
brokers string数组 必填 发送日志至Kafka的brokers地址

topic

字段 类型 是否必填 默认值 含义
topic string 非必填 loggie 发送日志至Kafka的topic

可使用${a.b}的方式,获取event里的字段值作为具体的topic名称。

比如,一个event为:

{
  "topic": "loggie",
  "hello": "world"
}
可配置topic: ${topic},此时该event发送到Kafka的topic为"loggie"。

同时支持嵌套的选择方式:

{
  "fields": {
    "topic": "loggie"
  },
  "hello": "world"
}
可配置topic: ${fields.topic},同样也会发送到topic "loggie"。

ifRenderTopicFailed

字段 类型 是否必填 默认值 含义
ifRenderTopicFailed 非必填 如果使用动态的规则渲染topic,比如topic: ${fields.topic},有可能渲染失败(比如日志没有fields.topic字段),下面配置指示失败后的动作。
ifRenderTopicFailed.dropEvent 非必填 true 默认为丢弃
ifRenderTopicFailed.ignoreError 非必填 忽略错误日志,请注意,这里只是不打印错误日志
ifRenderTopicFailed.defaultTopic 非必填 发送至该默认的topic,配置后dropEvent不生效

Example

sink:
  type: kafka
  brokers: ["127.0.0.1:6400"]
  topic: "log-${fields.topic}"
  ifRenderTopicFailed:
    dropEvent: true
sink:
  type: kafka
  brokers: ["127.0.0.1:6400"]
  topic: "log-${fields.topic}"
  ifRenderTopicFailed:
    ignoreError: true
    defaultTopic: default

ignoreUnknownTopicOrPartition

字段 类型 是否必填 默认值 含义
ignoreUnknownTopicOrPartition 非必填 用于当发送的topic不存在时,忽略Kafka返回UNKNOWN_TOPIC_OR_PARTITION的报错
  • 这种情况一般发生在使用动态渲染的topic,但是环境里的Kafka关闭了自动创建topic,导致无法发送至渲染出来的topic。默认情况Loggie会一直不停的重试,从而无法发送新的日志。
  • 开启ignoreUnknownTopicOrPartition后,会直接丢弃发送的日志,避免影响其他正常包含已存在topic的日志发送。
  • 请注意和上面ifRenderTopicFailed的区别,ifRenderTopicFailed是动态渲染不出来topic或者渲染的为空值,而ignoreUnknownTopicOrPartition则是渲染成功,但是topic在Kafka中实际不存在。

balance

字段 类型 是否必填 默认值 含义
balance string 非必填 roundRobin 负载均衡策略,可填hashroundRobinleastBytes

compression

字段 类型 是否必填 默认值 含义
compression string 非必填 gzip 日志发送至Kafka的压缩策略,可填gzipsnappylz4zstd

maxAttempts

字段 类型 是否必填 默认值 含义
maxAttempts int 非必填 10 发送最多重试次数

batchSize

字段 类型 是否必填 默认值 含义
batchSize int 非必填 100 发送时每个batch最多包含的数据个数

batchBytes

字段 类型 是否必填 默认值 含义
batchBytes int 非必填 1048576 每个发送请求包含的最大字节数

batchTimeout

字段 类型 是否必填 默认值 含义
batchTimeout time.Duration 非必填 1s 形成每个发送batch的最长时间

readTimeout

字段 类型 是否必填 默认值 含义
readTimeout time.Duration 非必填 10s 读取超时时间

writeTimeout

字段 类型 是否必填 默认值 含义
writeTimeout time.Duration 非必填 10s 写入超时时间

requiredAcks

字段 类型 是否必填 默认值 含义
requiredAcks int 非必填 0 等待ack参数,可为01-1
  • 0: 不要求ack
  • 1: 等待leader partition ack
  • -1: 等待ISR中所有replica ack

sasl

字段 类型 是否必填 默认值 含义
sasl 非必填 SASL authentication
sasl.type string 必填 SASL类型,可为:plainscram
sasl.username string 必填 用户名,请注意在v1.4和之前使用的名称为userName,当然后续版本也对此进行了兼容
sasl.password string 必填 密码
sasl.algorithm string type=scram时必填 type=scram时使用的算法,可选sha256sha512

partitionKey

字段 类型 是否必填 默认值 含义
partitionKey string 非必填 控制发送至topic下哪个分区

与topic相似,可使用${a.b}的方式,获取event里的字段值作为具体的topic名称。