Yukang's Page

Kong源码分析: 事件

2017-07-23

Kong的缓存更新很多依赖于事件,而事件看起来是相对来说比较复杂、也是最有趣的一部分。

worker模型

假设我们对Kong做了一个更改的请求,这个请求通常是通过admin_api这个路由处理的。也就是说最终执行数据库操作的动作是在一个Nginx worker进程里。因为操作了数据库所以我们需要刷新这个Kong节点的所有worker的缓存,而且要把事件分发给其他Kong节点,让其他Kong节点刷新所有worker的缓存。

kong-message

这就涉及到两部分:

  1. Kong节点之间的消息通信, 这是使用serf来实现的
  2. Kong每个节点内部,也就是Nginx worker之间的通信,这是使用lua-resty-worker-events来进行。

发布订阅模式

发布订阅是实现事件的一种经典设计模式,主要需要有两类操作:

  1. 发布消息
  2. 订阅消息,收到消息后触发指定的函数。

Kong使用的是一个叫作mediator_lua,mediator中文意思为”中间人”,很符合项目的意思。可以看到kong/core/events.lua里面实现如下:

function Events:subscribe(event_name, fn)
if fn then
self._mediator:subscribe({event_name}, function(message_t)
fn(message_t)
return nil, true -- Required to tell mediator to continue processing other events
end)
end
end
function Events:publish(event_name, message_t)
if event_name then
self._mediator:publish({string.upper(event_name)}, message_t)
end
end

Kong.init初始化的时候会调用一个叫做attach_hooks的函数:

attach_hooks(events, require "kong.core.hooks")

在load插件的时候也会把插件对应hooks绑定上:

-- Attaching hooks
local ok, hooks = utils.load_module_if_exists("kong.plugins." .. plugin .. ".hooks")
if ok then
attach_hooks(events, hooks)
end

事件的来源

上面说过,Kong节点之间通信是通过serf发送的。我们来看看事件是如何触发发出通知的。
事件来于源数据库的修改,那就应该在数据库代码部分有触发事件的代码,查看dao/dao.lua这个文件里的代码,我们可以看到在insert、update、insert执行的时候都调用了一行代码

event(self, event_types.ENTITY_DELETED, k, v.schema, entity)

这个函数的实现如下,这里做了数据的序列化,然后发布了一种叫做CLUSTER_PROGATE类型的消息:

local function event(self, type, table, schema, data_t)
if self.events_handler then
..... 执行数据序列化
self.events_handler:publish(self.events_handler.TYPES.CLUSTER_PROPAGATE, payload)
end
end

在core/hooks.lua接受消息部分,events.TYPES.CLUSTER_PROPAGATE对应的处理部分是singletons.serf:event(message_t),所以我们看serf.lua这个源文件,最终event调用了invoke_signal,这个函数会运行一个serf命令,类似于这样:

serf event -coalesce=false -rpc-addr=127.0.0.1:7373 kong '{"type":"ENTITY_UPDATED","primary_key":["id"],"collection":"apis","entity":{"id":"94acca76-d61a-429e-86a9-5abf2c61ee31"}}'

这就出发了一个serf event,其他Kong节点会收到此消息。

serf: Kong节点之间通信

那么Kong节点收到消息之后是如何处理的呢?Kong在启动的时候会在后台执行一个serf进程,类似这样:

serf agent -profile wan -bind 0.0.0.0:7946 -log-level err -rpc-addr 127.0.0.1:7373 -event-handler member-join,member-leave,member-failed,member-update,member-reap,user:kong=/usr/local/kong/serf/serf_event.sh -node Kang.local_0.0.0.0:7946_be3b9352808e4839a272f30ca6025650

可以看看serf_event.sh这个脚本,内容如下:

PAYLOAD=`cat` # Read from stdin
if [ "$SERF_EVENT" != "user" ]; then
PAYLOAD="{\"type\":\"${SERF_EVENT}\",\"entity\": \"${PAYLOAD}\"}"
fi
CMD="\
local http = require 'resty.http' \
local client = http.new() \
client:set_timeout(5000) \
client:connect('127.0.0.1', 8001) \
client:request { \
method = 'POST', \
path = '/cluster/events/', \
body = [=[${PAYLOAD}]=], \
headers = { \
['content-type'] = 'application/json' \
} \
}"
/usr/local/openresty/bin/resty -e "$CMD"

可以看到serf收到消息后会触发这个脚本,然后把消息发送到本节点的/cluster/events这个路由。api/routes/cluster.lua这个文件里有收到消息后的处理代码,其中最关键的是:

-- Trigger event in the node
ev.post(constants.CACHE.CLUSTER, message_t.type, message_t)

就是通过resty.worker.events publish出收到的消息,本节点的worker会处理这些消息。

worker刷新缓存

假设当前Kong节点收到一个消息,这个消息是如何分发给各个worker的?从代码看出,在Kong初始化的时候有调用一个叫做kong.lua里面的Kong.init_worker()函数,其中有一段代码注册了event handler:

local worker_events = require "resty.worker.events"
local handler = function(data, event, source, pid)
if data and data.collection == "apis" then
assert(core.build_router())
elseif source and source == constants.CACHE.CLUSTER then
singletons.events:publish(event, data)
end
end
worker_events.register(handler)

可以从上面的handler代码看到,一个worker接收到消息之后执行的是:

singletons.events:publish(event, data)

也就是通过mediator_lua再把消息publish。之前初始化的时候已经attach_hooks了各种handler,这时候那些注册的函数才会被最终执行,比如核心的刷新缓存部分代码在core/hooks.lua的invalidate函数里面。

回顾

总的来说Kong事件部分的代码相当精妙,也很统一。比如当前worker做了修改,这个事件会发送给各个节点,包括当前自己所在的节点。通过发布订阅模式,写代码的时候只需关心消息发送、接受消息索要处理的逻辑。

Tags: Lua
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章