博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flume 自定义sink
阅读量:7081 次
发布时间:2019-06-28

本文共 2467 字,大约阅读时间需要 8 分钟。


http://flume.apache.org/FlumeDeveloperGuide.html#sink


看了 还是比较好上手的,简单翻译一下


sink的作用是从 Channel 提取 Event 然后传给流中的下一个 Flume Agent或者把它们存储在外部的仓库中。在Flume的配置文件中,一个 Sink 和一个唯一的 Channel 关联。有一个 SinkRunner 实例与每一个配好的 Sink 关联,当 Flume 框架调用 SinkRunner 的 start() 方法时,就创建一个新的线程来驱动这个 Sink (使用  SinkRunner 的实现Runnable接口的 PollingRunner 内部静态类来运行)。这个线程管理了 Sink 的生命周期。 Sink 需要实现 start() 和 stop() 方法。Sink 的 start() 方法需要初始化 Sink 并使它能够达到向目的地发送 Event 的状态。 Sink 的 process() 方法是处理从 Channel 传过来的 Event 和 发送 Event 的核心方法。 Sink 的 Stop() 方法需要做必要的清理工作(比如释放某些资源)。 Sink 也需要实现 Configurable 接口来处理自己的一些配置。


官网也给出了模板类:

1 public class MySink extends AbstractSink implements Configurable { 2     private String myProp; 3  4     @Override 5     public void configure(Context context) { 6         String myProp = context.getString("myProp", "defaultValue"); 7  8         // Process the myProp value (e.g. validation) 9 10         // Store myProp for later retrieval by process() method11         this.myProp = myProp;12     }13 14     @Override15     public void start() {16         // Initialize the connection to the external repository (e.g. HDFS) that17         // this Sink will forward Events to ..18     }19 20     @Override21     public void stop() {22         // Disconnect from the external respository and do any23         // additional cleanup (e.g. releasing resources or nulling-out24         // field values) ..25     }26 27     @Override28     public Status process() throws EventDeliveryException {29         Status status = null;30 31         // Start transaction32         Channel ch = getChannel();33         Transaction txn = ch.getTransaction();34         txn.begin();35 36         try {37             // This try clause includes whatever Channel operations you want to do38             Event event = ch.take();39 40             // Send the Event to the external repository.41             // storeSomeData(e);42             txn.commit();43             status = Status.READY;44         } catch (Throwable t) {45             txn.rollback();46 47             // Log exception, handle individual exceptions as needed48             status = Status.BACKOFF;49 50             // re-throw all Errors51             if (t instanceof Error) {52                 throw (Error) t;53             }54         } finally {55             txn.close();56         }57 58         return status;59     }60 }

 


拿来模板直接填充自己的逻辑代码即可,详细就可以直接参考HDFSSink或者HBaseSink等


 

转载于:https://www.cnblogs.com/admln/p/flume-user-defined-sink.html

你可能感兴趣的文章
安装 CentOS 时找不到硬盘的解决办法
查看>>
Java中的访问控制public,private,protected,package
查看>>
Foxmail 6.5在Windwos 7下无法编辑签名
查看>>
Putty 连接Centos服务器
查看>>
Active Diretory 目录服务相关命令
查看>>
建立属于自己的Cydia源,并获取cydia中的deb安装包,cyder不报错汉化版
查看>>
python列表中任意两个元素交换
查看>>
中国Linux联盟 - 圣诞节寄语
查看>>
Redis 的 master/slave 复制
查看>>
Java_Java面向对象核心总结
查看>>
MDT配置文件BootStrap.ini和CustomSettings.ini
查看>>
>>>运算符
查看>>
think it clear: const, char, format
查看>>
45.将国内版AWS的虚拟机迁移到国内版Azure(下)
查看>>
office办公中文字的处理与排版
查看>>
windows server 2003的安装简介
查看>>
Restlet - REST架构风格的介绍
查看>>
我的友情链接
查看>>
三维观察---OpenGL任选裁剪平面
查看>>
discuz管理中心登录的时候闪退
查看>>