跳转至

transformer

带有条件判断的函数式数据处理interceptor。
属于source interceptor。

使用场景

  • 日志提取出日志级别level,并且drop掉DEBUG日志
  • 日志里混合包括有json和plain的日志形式,可以判断json形式的日志并且进行处理
  • 根据访问日志里的status code,增加不同的topic字段
  • ...

示例参考

使用方式

transformer会按照配置的actions里顺序执行所有的action。action类似函数的方式,可以写入参数,参数一般为event里的字段。
同时,每个action里还可能包括额外的控制字段。比如下面regex(body),body即为regex的参数,pattern为额外的字段。

interceptors:
  - type: transformer
    actions:
      - action: regex(body)
        pattern: ^(?P<time>[^ ^Z]+Z) (?P<level>[^ ]*) (?P<log>.*)$
      - action: add(topic, common)

另外,action还支持条件判断if-then-else的方式:

- if: <condition>
  then:
    - action: funcA()
  else:
    - action: funcB()

其中,condition条件判断也为函数的形式。

interceptors:
  - type: transformer
    actions:
      - if: equal(status, 404)
        then:
          - action: add(topic, not_found)
          - action: return()

action

公共字段

ignoreError

  • ignoreError: 表示是否忽略该action处理过程中的错误,并且不会打印错误日志。

Example

- type: transformer
  actions:
    - action: regex(body)
      pattern: (?<ip>\S+) (?<id>\S+) (?<u>\S+) (?<time>\[.*?\]) (?<url>\".*?\") (?<status>\S+) (?<size>\S+)
      ignoreError: true
这里的ignoreError设置为true,表示会忽略该正则匹配的错误,并且会继续执行后续的action。

dropIfError

表示如果出现错误,直接丢弃该条event。

Example

- type: transformer
  actions:
    - action: regex(body)
      pattern: (?<ip>\S+) (?<id>\S+) (?<u>\S+) (?<time>\[.*?\]) (?<url>\".*?\") (?<status>\S+) (?<size>\S+)
      dropIfError: true
这里的dropIfError设置为true,表示如果出现正则匹配的错误,会直接丢弃这条日志(后续action也不会执行)。

add(key, value)

给event添加额外的key:value。

Example

- action: add(topic, loggie)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "topic": "loggie"
}

copy(from, to)

复制event里的字段。

参数:

  • from: 原有的key
  • to: 复制后的key

Example

- action: copy(foo, bar)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "foo": "loggie"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "foo": "loggie",
  "bar": "loggie"
}

move(from, to)

移动/重命名字段。

参数:

  • from: 原有的key
  • to: 移动后的key

Example

- action: move(foo, bar)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "foo": "loggie"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "bar": "loggie"
}

set(key, value)

更新字段key的值为value。

参数:

  • key: 需更新的字段
  • value: 更新的后的值

Example

- action: set(foo, test)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "foo": "loggie"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "foo": "test"
}

del(key1, key2...)

删除字段。可填写多个字段key。

Example

- action: del(foo)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "foo": "loggie"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
}

underRoot(key)

将嵌套的字段放在根部(最外层)。

Example

- action: underRoot(state)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "state": {
    "node": "127.0.0.1",
    "phase": "running"
  }
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "node": "127.0.0.1",
  "phase": "running"
}

fmt(key)

将某个字段的值重新渲染,可根据其他的字段值组成一个值。如果key不存在则会新增该字段。

额外字段:

  • pattern: 必填,表示格式化的规则,比如${state.node}-${state.phase}。如果pattern为固定值,则类似set(key, value)。

Example

- action: fmt(status)
  pattern: ${state.node} is ${state.phase}

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "state": {
    "node": "127.0.0.1",
    "phase": "running"
  }
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "state": {
    "node": "127.0.0.1",
    "phase": "running"
  }
  "status": "127.0.0.1 is running"
}

timestamp(key)

字段的时间格式转换。

额外字段:

  • fromLayout: 必填,指定字段的时间格式(golang形式),也可为unixunix_ms
  • fromLocation: 指定字段的时区,也可为UTC或者Local,如果为空,则为UTC
  • toLayout: 必填,转换后的时间格式(golang形式),也可为unixunix_ms
  • toLocation: 转换后的时间时区,也可为UTC或者Local,如果为空,则为UTC

Example

- action: timestamp(time)
  fromLayout: "2006-01-02 15:04:05"
  fromLocation: Asia/Shanghai
  toLayout: unix_ms
  toLocation: Local

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "time": "2022-06-28 11:24:35"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "time": 1656386675000
}

以上的layout参数需要填写golang形式,可参考:

const (
    Layout      = "01/02 03:04:05PM '06 -0700" // The reference time, in numerical order.
    ANSIC       = "Mon Jan _2 15:04:05 2006"
    UnixDate    = "Mon Jan _2 15:04:05 MST 2006"
    RubyDate    = "Mon Jan 02 15:04:05 -0700 2006"
    RFC822      = "02 Jan 06 15:04 MST"
    RFC822Z     = "02 Jan 06 15:04 -0700" // RFC822 with numeric zone
    RFC850      = "Monday, 02-Jan-06 15:04:05 MST"
    RFC1123     = "Mon, 02 Jan 2006 15:04:05 MST"
    RFC1123Z    = "Mon, 02 Jan 2006 15:04:05 -0700" // RFC1123 with numeric zone
    RFC3339     = "2006-01-02T15:04:05Z07:00"
    RFC3339Nano = "2006-01-02T15:04:05.999999999Z07:00"
    Kitchen     = "3:04PM"
    // Handy time stamps.
    Stamp      = "Jan _2 15:04:05"
    StampMilli = "Jan _2 15:04:05.000"
    StampMicro = "Jan _2 15:04:05.000000"
    StampNano  = "Jan _2 15:04:05.000000000"
)
还可以根据实际情况修改。

regex(key)

使用正则的方式切分日志,提取字段。 另外也可以为regex(key, to)。

参数:

  • key: 必填,正则提取的字段
  • to: 非必填,提取后所有的字段放置到的key。默认为空,表示将字段提取到根部

额外字段:

  • pattern: 必填,正则表达式

Example

- action: regex(body)
  pattern: (?<ip>\S+) (?<id>\S+) (?<u>\S+) (?<time>\[.*?\]) (?<url>\".*?\") (?<status>\S+) (?<size>\S+)

input:

{
  "body": "10.244.0.1 - - [13/Dec/2021:12:40:48 +0000] 'GET / HTTP/1.1' 404 683",
}

output:

{
  "ip":     "10.244.0.1",
  "id":     "-",
  "u":      "-",
  "time":   "[13/Dec/2021:12:40:48 +0000]",
  "url":    "GET / HTTP/1.1",
  "status": "404",
  "size":   "683",
}

grok(key)

使用grok的方式切分日志,提取字段。 另外也可以为grok(key, to)。

参数:

  • key: 必填,grok提取的字段
  • to: 非必填,提取后所有的字段放置到的key。默认为空,表示将字段提取到根部

额外字段:

  • match: 必填,grok表达式
  • ignoreBlank: 非必填,默认true,是否忽略空字段,如果解析得到的字段key的结果为"",那么结果不会写入key:""
  • pattern: 非必填,自定义pattern
  • patternPaths: 非必填,获取pattern的路径,支持url和path,其中url为解析get请求的response。这里提供一个实例的url;path则为本地路径,如果填写的是目录则会拿目录下所有文件内可能包含的规则

Example

- action: grok(body)
  match: "^%{DATESTAMP:datetime} %{FILE:file}:%{INT:line}: %{IPV4:ip} %{PATH:path} %{UUID:uuid}(?P<space>[a-zA-Z]?)"
  pattern: 
    FILE: "[a-zA-Z0-9._-]+"

input:

{
  "body": "2022/05/28 01:32:01 logTest.go:66: 192.168.0.1 /var/log/test.log 54ce5d87-b94c-c40a-74a7-9cd375289334",
}

output:

        "datetime": "2022/05/28 01:32:01",
            "line":     "66",
            "ip":       "192.168.0.1",
            "path":     "/var/log/test.log",
            "uuid":     "54ce5d87-b94c-c40a-74a7-9cd375289334",

jsonDecode(key)

将json文本反序列化。 也可以为jsonDecode(key, to)。

参数:

  • key: 必填,对应的字段key
  • to: 非必填,提取后所有的字段放置到的key。默认为空,则表示将字段提取到根部

Example

- action: jsonDecode(body)

input:

{
  "body": `{"log":"I0610 08:29:07.698664 Waiting for caches to sync", "stream":"stderr", "time":"2021-06-10T08:29:07.698731204Z"}`,
}

output:

{
  "log": "I0610 08:29:07.698664 Waiting for caches to sync",
  "stream": "stderr", 
  "time": "2021-06-10T08:29:07.698731204Z"
}

jsonEncode(key)

将多个字段序列化成json string形式。 也可以为jsonEncode(key, to)。

参数:

  • key: 必填,对应的字段key
  • to: 非必填,提取后所有的字段放置到的key。默认为空,则表示将字段提取到根部

Example

interceptors:
  - type: transformer
    actions:
    - action: jsonEncode(fields)

input:

{
  "body": "this is test",
  "fields":
    "topic": "loggie",
    "foo": "bar"
}

output:

 {
  "fields": "{\"topic\":\"loggie\",\"foo\":\"bar\"}",
  "body": "this is test"
}

split(key)

将一行日志根据某种分割符切分。

参数:

  • key: 必填,对应的字段key
  • to: 非必填,提取后所有的字段放置到的key。默认为空,则表示将字段提取到根部

额外字段:

  • separator: 分隔符,string,必填
  • max: 通过分割符分割后得到的最多的字段数,int,非必填,默认值为-1
  • keys: 分割后字段对应的key,string数组,必填

Example

  interceptors:
  - type: transformer
    actions:
      - action: split(body)
        separator: "|"
        keys: ["time", "order", "service", "price"]

input:

  "body": `2021-08-08|U12345|storeCenter|13.14`,

output:

  "time": "2021-08-08"
  "order": "U12345"
  "service": "storeCenter"
  "price: "13.14"

strconv(key, type)

字段值类型转换。

参数:

  • key: 目标字段
  • type: 转换后的类型,可为bool, int, float

Example

- action: strconv(code, int)

input:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "code": "200"
}

output:

{
  "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  "code": 200
}

print()

打印event。一般用于调试阶段使用。

return()

控制类型函数,执行到return()后返回,不再继续执行下面的action。

dropEvent()

控制类型函数,执行到dropEvent()后会将该event直接丢弃。这意味着该条数据会丢失,也不会继续被后续的interceptor或者sink处理消费。

Example

interceptors:
  - type: transformer
    actions:
      - action: regex(body)
        pattern: ^(?P<time>[^ ^Z]+Z) (?P<level>[^ ]*) (?P<log>.*)$
      - if: equal(level, DEBUG)
        then:
          - action: dropEvent()
假设日志为:2021-02-16T09:21:20.545525544Z DEBUG this is log body,则满足level字段为DEBUG,会直接丢弃该条日志。

condition

条件判断类函数。

操作符

  • AND:表示两个condition执行结果的

Example

interceptors:
  - type: transformer
    actions:
      - if: equal(level, DEBUG) AND equal(code, 200)
        then:
          - action: dropEvent()
  • OR:表示两个condition执行结果的

Example

interceptors:
  - type: transformer
    actions:
      - if: equal(level, DEBUG) OR equal(level, INFO)
        then:
          - action: dropEvent()
  • NOT:表示对condition执行结果取

Example

interceptors:
  - type: transformer
    actions:
      - if: NOT equal(level, DEBUG)
        then:
          - action: dropEvent()

equal(key, target)

目标字段值是否和参数值target相等。

contain(key, target)

字段值是否包含参数值target。

Caution

target请直接使用字符串,无需添加双引号。 比如contain(body, error),而不是contain(body, "error")contain(body, "error")会被当作"error"来匹配。

exist(key)

目标字段是否存在或者是否为空。

greater(key, value)

目标字段的值是否大于参数值value。

less(key, value)

目标字段的值是否小于参数值value。

hasPrefix(key, prefix)

目标字段的值是否包含prefix前缀。

match(key, regex)

目标字段的值是否和参数regex正则匹配。

oneOf(key, value1, value2...)

目标字段的值是否是参数值value1...其中之一。