使用SCF Trigger,30行代码搞定 Ckafka To Elasticsearch

昨天刚刚才开始研究Ckafka To Elasticsearch这个方案,如有不太对的地方还请大佬们指教。首先,我们来介绍下概念和场景部分:

SCF触发器:云函数SCF是典型的事件触发(Event-Triggered)形态的无服务器运行环境,核心组件是 SCF 函数和事件源。其中,事件源是发布事件(Event)的腾讯云服务或用户自定义代码,SCF 函数是事件的处理者,而函数触发器就是管理函数和事件源对应关系的集合。

Kafka: Apache Kafka 消息队列引擎,提供高吞吐性能、高可扩展性的消息队列服务。

Elasticsearch :Elasticsearch Service(ES)是基于开源搜索引擎 Elasticsearch 构建的高可用、可伸缩的云端托管 Elasticsearch 服务。Elasticsearch 是一款分布式的基于 RESTful API 的搜索分析引擎,可以应用于日益增多的海量数据搜索和分析等应用场景。

众所周知,在日志处理场景中最常见的其实是ELK (Elasticsearch、Logstash、Kibana), 腾讯云的Elasticsearch其实已经集成了Elasticsearch、Kibana 那在ELK这个使用场景下缺的应该是一个Logstash的角色,如果可以的话我们为什么不用SCF Ckafka的trigger来替代原有的Logstash呢? 其实原理很简单,在trigger上拉到数据,然后原封不动传给Elasticsearch就可以了。

说干就干,千里之行始于足下,我们先搂一眼 Elasticsearch 的SDK:

感觉还是比较清爽的,不过这里选择版本的时候必须选择高于当前使用 Elasticsearch 版本的 SDK 否则会报406错误,血的教训。

我这里操作的话使用的 Elasticsearch 7.x 版本的 Python SDK 链接在这里:https://pypi.org/project/elasticsearch7/

Elasticsearch 对SDK的封装特别简单,基本就是对API简单包了一层,其实自己请求 POST 也是可以满足需求的,当然如果自己愿意在去封装的话。。

这里我直接用的 腾讯云基础版本的Elasticsearch服务作为演示,自建也是同理,只要搞对VPC网络就可以了,画个重点这块需要云函数与Elasticsearch同地域同VPC同子网

搞完之后我们先看下怎么连接到腾讯云的ES把,腾讯云Es连接需要在Elasticsearch函数的参数中设置如下3个参数关闭节点嗅探:

sniff_on_start=False
sniff_on_connection_fail=False
sniffer_timeout=None

这里具体应该是这样的,填上自己的es信息就好了,简单的单条插入是这么写的:

from elasticsearch import Elasticsearch

es = Elasticsearch(["http://xx.xx.xx.xx:9200"],
      http_auth=('user', 'passwd'),
          sniff_on_start=False,
          sniff_on_connection_fail=False,
          sniffer_timeout=None)

res = es.index(index="my_index", doc_type="my_type", id=1, body={"title": "One", "tags": ["ruby"]})
print(res)

千万注意与云函数同地域,不然这块一定会报错。

事情远远没有那么简单,我们考虑的是ELK海量的处理场景,所以这块直接For循环es.index命令一定没法满足我们的要求,之前实验过顺序向es的my_index索引(该索引已存在)写入100条文档,却花费了大约7秒左右,这种速度在大量数据的时候,肯定不行。

后来,想到一个办法通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先我们将所有的数据定义成字典形式,各字段含义如下:

_index对应索引名称,并且该索引必须存在。
_type对应类型名称。
_source对应的字典内,每一篇文档的字段和值,可有有多个字段。

首先将每一篇文档(组成的字典)都整理成一个大的列表,然后,通过helper.bulk(es, action)将这个列表写入到es对象中。然后,这个程序要执行的话——你就要考虑,这个一千万个元素的列表,是否会把你的内存撑爆(MemoryError)!很可能还没到没到写入es那一步,却因为列表过大导致内存错误而使写入程序崩溃!代码如下:

    for i in range(1, 100001, 1000):
        action = ({
            "_index": "my_index",
            "_type": "doc",
            "_source": {
                "title": k
            }
        } for k in range(i, i + 1000))
        helpers.bulk(es, action)

最后的最后,找到了一个方法,将生成器交给es侧去处理,不在函数处理!这样,Python的压力更小了,不过这里的es压力会更大,无论是分批处理还是使用生成器,es的压力都不小,写入操作本来就耗时嘛:

def Taobrss():
    """ 使用生成器批量写入数据 """
    action = ({
        "_index": "my_index",
        "_type": "doc",
        "_source": {
            "title": i
        }
    } for i in range(100000))
    helpers.bulk(es, action)

那么,得到的demo应该是这样的:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers

esServer = "http://172.16.16.53:9200"  # 修改为 es server 地址+端口 E.g. http://172.16.16.53:9200
esUsr = "elastic" # 修改为 es 用户名 E.g. elastic
esPw = "Cc*******" # 修改为 es 密码 E.g. PW2312321321
esIndex = "pre1"  # es中已经创建的 index ,可以直接通过 es.indices.create(index='my-index111')

# ... or specify common parameters as kwargs
es = Elasticsearch([esServer],
      http_auth=(esUsr, esPw),
          sniff_on_start=False,
          sniff_on_connection_fail=False,
          sniffer_timeout=None)

def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗时约 {:.2f} 秒'.format(time.time() - start))
        return res

    return wrapper

def main_handler(event, context):
    action = ({      
        "_index": esIndex,
        "_source": {
            "msgBody": record["Ckafka"]["msgBody"] # 获取 Ckafka 触发器 msgBody
        }
    } for record in event["Records"])  # 获取 event Records 字段 数据结构 https://cloud.tencent.com/document/product/583/17530 
    print(action)  
    helpers.bulk(es, action)
    return("successful!")

在触发器拿到 event Records 字段 ,搞到ES 完事大吉。触发器这块的设置如下,可以直接在Ckafka消息转储功能中选择通用模板:

CKafka 实例:配置连接的 CKafka 实例,仅支持选择同地域下的实例。
Topic:支持在 CKafka 实例中已经创建的 Topic。
最大批量消息数:在拉取并批量投递给当前云函数时的最大消息数,目前支持最高配置为10000。结合消息大小、写入速度等因素影响,每次触发云函数并投递的消息数量不一定能达到最大值,而是处在1 – 最大消息数之间的一个变动值。
起始位置:触发器消费消息的起始位置,默认从最新位置开始消费。支持最新、最开始、按指定时间点三种配置。
重试次数:函数发生运行错误(含用户代码错误和 Runtime 错误)时的最大重试次数。

看一眼log和kibana,确认上传信息万事大吉:

代码下载:点击下载