跳转至

组件开发

推荐参考已有的组件开发一个新的组件

公共接口

// 生命周期接口
type Lifecycle interface {
    // 初始化,例如初始化Kafka连接
    Init(context Context)
    // 启动运行,例如开始消费Kafka
    Start()
    // 停止
    Stop()
}

// 描述接口
type Describable interface {
    // 类别,例如source
    Category() Category
    // 类型,例如kafka
    Type() Type
    // 自定义描述
    String() string
}

// 配置获取接口
type Config interface {
    // 获取配置
    Config() interface{}
}

// 组件接口
type Component interface {
    // 生命周期管理
    Lifecycle
    // 描述管理
    Describable
    // 配置管理
    Config
}

source组件

source组件对接数据源输入,开发一个新的source插件需要实现如下接口

// source组件接口
type Source interface {
    Component
    Producer
    // 提交接口,确认sink端成功然后提交
    Commit(events []Event)
}

// 生产接口,source组件需要实现
type Producer interface {
    // 对接数据源
    ProductLoop(productFunc ProductFunc)
}

sink组件

sink组件对接输出端,开发一个新的sink插件需要实现如下接口

 // sink组件接口
type Sink interface {
    Component
    Consumer
}

// 消费接口,sink组件需要实现
type Consumer interface {
    // 对接输出端
    Consume(batch Batch) Result
}

interceptor组件

interceptor组件对事件进行拦截处理,开发一个新的interceptor插件需要实现如下接口

// interceptor组件接口
type Interceptor interface {
    Component
    // 拦截处理
    Intercept(invoker Invoker, invocation Invocation) api.Result
}

Note

请注意新增的组件需要放到pkg/include/include.go的import当中注册