Skip to content

kafka-logger

API 网关 APIX 的 kafka-logger 插件用于将日志作为 JSON 对象推送到 Apache Kafka 集群中。

描述

kafka-logger 插件用于将日志作为 JSON 对象推送到 Apache Kafka 集群中。可用作 ngx_lua NGINX 模块的 Kafka 客户端驱动程序。

属性

名称类型必选项默认值有效值描述
broker_listobject已废弃,现使用 brokers 属性代替。原指需要推送的 Kafka 的 broker 列表。
brokersarray需要推送的 Kafka 的 broker 列表。
brokers.hoststringKafka broker 的节点 host 配置,例如 192.168.1.1
brokers.portstringKafka broker 的节点端口配置
brokers.sasl_configobjectKafka broker 中的 sasl_config
brokers.sasl_config.mechanismstring"PLAIN"["PLAIN"]Kafka broker 中的 sasl 认证机制
brokers.sasl_config.userstringKafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写
brokers.sasl_config.passwordstringKafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写
kafka_topicstring需要推送的 topic。
producer_typestringasync["async", "sync"]生产者发送消息的模式。
required_acksinteger1[0, 1, -1]生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka acks 属性相同,具体配置请参考 Apache Kafka 文档
keystring用于消息分区而分配的密钥。
timeoutinteger3[1,...]发送数据的超时时间。
namestring"kafka logger"batch processor 的唯一标识。
meta_formatenum"default"["default","origin"]default:获取请求信息以默认的 JSON 编码方式。origin:获取请求信息以 HTTP 原始请求方式。更多信息,请参考 meta_format
log_formatobject以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量NGINX 内置变量
include_req_bodybooleanfalse[false, true]当设置为 true 时,包含请求体。注意:如果请求体无法完全存放在内存中,由于 NGINX 的限制,APISIX 无法将它记录下来。
include_req_body_exprarrayinclude_req_body 属性设置为 true 时进行过滤。只有当此处设置的表达式计算结果为 true 时,才会记录请求体。更多信息,请参考 lua-resty-expr
include_resp_bodybooleanfalse[false, true]当设置为 true 时,包含响应体。
include_resp_body_exprarrayinclude_resp_body 属性设置为 true 时进行过滤。只有当此处设置的表达式计算结果为 true 时才会记录响应体。更多信息,请参考 lua-resty-expr
cluster_nameinteger1[0,...]Kafka 集群的名称,当有两个及以上 Kafka 集群时使用。只有当 producer_type 设为 async 模式时才可以使用该属性。
producer_batch_numinteger200[1,...]对应 lua-resty-kafka 中的 batch_num 参数,聚合消息批量提交,单位为消息条数。
producer_batch_sizeinteger1048576[0,...]对应 lua-resty-kafka 中的 batch_size 参数,单位为字节。
producer_max_bufferinginteger50000[1,...]对应 lua-resty-kafka 中的 max_buffering 参数,表示最大缓冲区,单位为条。
producer_time_lingerinteger1[1,...]对应 lua-resty-kafka 中的 flush_time 参数,单位为秒。
meta_refresh_intervalinteger30[1,...]对应 lua-resty-kafka 中的 refresh_interval 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。

该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 5 秒钟或队列中的数据达到 1000 条时提交数据,如需了解批处理器相关参数设置,请参考 Batch-Processor 配置部分。

提示

数据首先写入缓冲区。当缓冲区超过 batch_max_sizebuffer_duration 设置的值时,则会将数据发送到 Kafka 服务器并刷新缓冲区。

如果发送成功,则返回 true。如果出现错误,则返回 nil,并带有描述错误的字符串 buffer overflow

meta_format 示例

  • default:

    json
    {
     "upstream": "127.0.0.1:1980",
     "start_time": 1619414294760,
     "client_ip": "127.0.0.1",
     "service_id": "",
     "route_id": "1",
     "request": {
       "querystring": {
         "ab": "cd"
       },
       "size": 90,
       "uri": "/hello?ab=cd",
       "url": "http://localhost:1984/hello?ab=cd",
       "headers": {
         "host": "localhost",
         "content-length": "6",
         "connection": "close"
       },
       "body": "abcdef",
       "method": "GET"
     },
     "response": {
       "headers": {
         "connection": "close",
         "content-type": "text/plain; charset=utf-8",
         "date": "Mon, 26 Apr 2021 05:18:14 GMT",
         "server": "APISIX/2.5",
         "transfer-encoding": "chunked"
       },
       "size": 190,
       "status": 200
     },
     "server": {
       "hostname": "localhost",
       "version": "2.5"
     },
     "latency": 0
    }
  • origin:

    http
    GET /hello?ab=cd HTTP/1.1
    host: localhost
    content-length: 6
    connection: close
    
    abcdef

插件元数据

名称类型必选项默认值描述
log_formatobject以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量NGINX 内置变量

:::note 注意

该设置全局生效。如果指定了 log_format,则所有绑定 kafka-logger 的路由或服务都将使用该日志格式。

:::

以下示例展示了如何通过 Admin API 配置插件元数据:

shell
curl http://127.0.0.1:9180/apix/admin/plugin_metadata/kafka-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "log_format": {
        "host": "$host",
        "@timestamp": "$time_iso8601",
        "client_ip": "$remote_addr"
    }
}'

配置完成后,你将在日志系统中看到如下类似日志:

shell
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}

如何启用

你可以通过如下命令在指定路由上启用 kafka-logger 插件:

shell
curl http://127.0.0.1:9180/apix/admin/routes/1 \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "plugins": {
       "kafka-logger": {
            "brokers" : [
              {
               "host": "127.0.0.1",
               "port": 9092
              }
            ],
           "kafka_topic" : "test2",
           "key" : "key1"
       }
    },
    "upstream": {
       "nodes": {
           "127.0.0.1:1980": 1
       },
       "type": "roundrobin"
    },
    "uri": "/hello"
}'

该插件还支持一次推送到多个 Broker,示例如下:

json
"brokers" : [
    {
      "host" :"127.0.0.1",
      "port" : 9092
    },
    {
      "host" :"127.0.0.1",
      "port" : 9093
    }
],

测试插件

你可以通过以下命令向 APIX 发出请求:

shell
curl -i http://127.0.0.1:9080/hello

禁用插件

当你需要禁用该插件时,可以通过如下命令删除相应的 JSON 配置,APISIX 将会自动重新加载相关配置,无需重启服务:

shell
curl http://127.0.0.1:9180/apix/admin/routes/1 \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "methods": ["GET"],
    "uri": "/hello",
    "plugins": {},
    "upstream": {
        "type": "roundrobin",
        "nodes": {
            "127.0.0.1:1980": 1
        }
    }
}'