• <small id="zzd1k"><th id="zzd1k"></th></small>

    <small id="zzd1k"><xmp id="zzd1k"></xmp></small><menu id="zzd1k"></menu>
  • <menu id="zzd1k"></menu>
  • <input id="zzd1k"><tt id="zzd1k"></tt></input>
    Java人工智能+Pythonweb前端UI/UE设计PHP+H5全栈工程师C/C++云计算大数据新媒体软件测试产品经理电商运营网络安全+运维Go语言与区块链影视制作PMP项目管理认证iOSAndroid+物联网.NET

    Apache Flume timestamp和host拦截器使用

    来源:黑马程序员

    浏览107人

    2019.09.19


    一、 Flume拦截器介绍

    拦截器是简单的插件式组件,设置在source和channel之间。source接收到的时间,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。

    flume内置了很多拦截器,并且会定期的添加一些拦截器,下面我们学习flume内置的,两个经常使用的拦截器。

    1. Timestamp Interceptor(时间戳拦截器)

    flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置。 

    参数默认值描述type类型名称timestamp,也可以使用类名的全路径。preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。

    a1.sources.r1.interceptors = timestamp 

    a1.sources.r1.interceptors.timestamp.type=timestamp 

    a1.sources.r1.interceptors.timestamp.preserveExisting=false

    2. Host Interceptor(主机拦截器)

    主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。主机拦截器的配置参数 默认值 描述 type   类型名称host hostHeader host 事件投的key useIP true 如果设置为false,host键插入主机名 preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换host报头的值

    a1.sources.r1.interceptors = host 

    a1.sources.r1.interceptors.host.type=host 

    a1.sources.r1.interceptors.host.useIP=false 

    a1.sources.r1.interceptors.timestamp.preserveExisting=true

    二、 业务需求

    使用flume内置拦截器完成如下需求:

     1568880664274843.png

    1. agent配置

    # 01 define agent name, source/sink/channel name

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # 02 source,http,jsonhandler

    a1.sources.r1.type = http

    a1.sources.r1.bind = master

    a1.sources.r1.port = 6666

    a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

    # 03 timestamp and host interceptors work before source

    a1.sources.r1.interceptors = i1 i2       # 两个interceptor串联,依次作用于event

    a1.sources.r1.interceptors.i1.type = timestamp 

    a1.sources.r1.interceptors.i1.preserveExisting = false  

    a1.sources.r1.interceptors.i2.type = host  

    # flume event的头部将添加 “hostname”:实际主机名

    a1.sources.r1.interceptors.i2.hostHeader = hostname  # 指定key,value将填充为flume agent所在节点的主机名

    a1.sources.r1.interceptors.i2.useIP = false          # IP和主机名,二选一即可

    # 04 hdfs sink

    a1.sinks.k1.type = hdfs  

    a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/%Y-%m-%d/   # hdfs sink将根据event header中的时间戳进行替换

    # 和hostHeader的值保持一致,hdfs sink将提取event中key为hostnmae的值,基于该值创建文件名前缀

    a1.sinks.k1.hdfs.filePrefix = %{hostname}   # hdfs sink将根据event header中的hostnmae对应的value进行替换

    a1.sinks.k1.hdfs.fileType = DataStream

    a1.sinks.k1.hdfs.writeFormat = Text

    a1.sinks.k1.hdfs.rollInterval = 0

    a1.sinks.k1.hdfs.rollCount = 10

    a1.sinks.k1.hdfs.rollSize = 1024000

    # channel,memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # bind source,sink to channel 

    a1.sinks.k1.channel = c1

    a1.sources.r1.channels = c1

    三、 验证拦截器效果

    1. 验证思路

    1)先将interceptor作用后的event,通过logger sink打印到console,验证header是否正常添加

    2)修改sink为hdfs, 观察目录和文件的名称是否能够按照预期创建(时间戳-目录,hostname-文件前缀)

    2. 验证过程

    1)发送header为空的http请求,logger sink打印event到终端,观察event header中是否被添加了timestamp以及hostname

    # 01 define agent name, source/sink/channel name

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # 02 source,http,jsonhandler

    a1.sources.r1.type = http

    a1.sources.r1.bind = node-1

    a1.sources.r1.port = 8888

    a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

    # 03 timestamp and host interceptors work before source

    a1.sources.r1.interceptors = i1 i2

    a1.sources.r1.interceptors.i1.type = timestamp 

    a1.sources.r1.interceptors.i1.preserveExisting = false   

    a1.sources.r1.interceptors.i2.type = host  

    a1.sources.r1.interceptors.i2.hostHeader = hostname

    a1.sources.r1.interceptors.i2.useIP = false

    # 04 hdfs sink

    a1.sinks.k1.type = logger  

    # channel,memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # bind source,sink to channel 

    a1.sinks.k1.channel = c1

    a1.sources.r1.channels = c1

    启动flume agent

    bin/flume-ng agent -c ./conf -f ./conf/http_sink_logger_source.conf -n a1 -Dflume.root.logger=INFO,console

    发送请求测试:

    curl -X POST -d '[{"header":{},"body":"time-host-interceptor001"}]' http://node-1:8888

    可以看到终端输出的event header中已经有了拦截器的信息

     1568880696499426.png


    修改sink为hdfs, 观察HDFS的目录名(时间戳)和文件前缀(hostnme)

     1568880706117614.png

    目录名被正常替换(基于event header中的时间戳)

     1568880718418785.png

    文件前缀被正常替换(基于event header中的hostname:实际主机名)

     1568880729792946.png

    文件内容被写入为event的body

     1568880737291282.png


    收藏文章

    分享

    分享到:QQ空间新浪微博腾讯微博人人网微信
    在线咨询 我要报名
    污污污的视频