1、介绍
Flume是一个分布式的收集、移动大量的日志数据,是一种基于数据流的架构(管道流);
(1)、Flume主要有三个概念:source、channel、sink。
(2)、4种使用方式:单例、多例、合并、复用
a、单例:只有一个flume程序和agent(单管道)
b、多例:多个单flume和agent的直线组合
c、合并:多个flume的n-n关系
d、复用:一个flume的流,流向多个目的地
2、使用经历
需求:使用flume来做历史数据的传输,要将客户的历史数据传到我们的公司的服务集群。
解决:source端我使用了taildir对文件目录下的匹配文件进行实时的监控;chanel我使用的memory型(这里后期我们会做成filechannel,因为memory对内存的消耗还是比较大的);sink端我自定义了使用http传输数据的sink类。在这个过程中我还对source的这一步的数据自定义了interceptor进行过滤。下面来说说每一步的实现思路。
(1)、source--taildir
1 2 3 4 5 6 7 8 9 10 |
#采用TAILDIR类型的source agent.sources.r1.type = TAILDIR #通道使用定义的c1通道 agent.sources.r1.channels = c1 #tail文件的记录文件位置 agent.sources.r1.positionFile = ./conf/taildir_position.json #监控的文件组 agent.sources.r1.filegroups = f1 #监控的文件组f1的绝对路径,我这里使用的是环境变量 agent.sources.r1.filegroups.f1 = ${FILE_PATH} |
(2)、interceptor-custom
需要实现Interceptor接口,并且将打包好的jar包放在flume/plugins.d/custome-interceptor中的lib文件夹中,在lib的同级目录中有一的libext文件夹,这里是放interceptor-custom用到的第三方jar包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
public class CustomInterceptor implements Interceptor { public CustomInterceptor() { //构造函数 } @Override public void initialize() { // TODO Auto-generated method stub } @Override public Event intercept(Event event) { //数据过滤,满足返回event,不满足返回null } @Override public List<Event> intercept(List<Event> events) { List<Event> out = Lists.newArrayList(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { // TODO Auto-generated method stub } public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { // excludeEvents = context.getBoolean(Constants.EXCLUDE_EVENTS,Constants.DEFAULT_EXCLUDE_EVENTS); } @Override public Interceptor build() { logger.info("Creating RegexFilteringInterceptor:excludeEvents=true"); return new CustomInterceptor(); } } } |
配置文件
1 2 3 4 |
#interceptors agent.sources.r1.interceptors = i1 #自定义的interceptor类 agent.sources.r1.interceptors.i1.type = cn.xxxCustomInterceptor$Builder |
(3)、memory通道
1 2 3 4 5 6 7 |
#channels #设置通道的类型为内存型 agent.channels.c1.type = memory #通道容量,我才用了环境变量动态设置 agent.channels.c1.capacity = ${CAPACITY} #每次从source或者到sink端的最大传输量,环境变量动态设置 agent.channels.c1.transactionCapacity = ${TRANS_CAPA} |
(4)、sink-custom
继承了AbstractSink,实现了Configurable,需要打包好的jar包放到flume目录下的lib文件夹中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
public class CustomSink extends AbstractSink implements Configurable{ private static final Logger logger = LoggerFactory.getLogger(CustomSink.class); public void configure(Context context) { //一些配置参数的获取 } @Override public void start() { //启动传输前需要做的事 super.start(); } @Override public void stop() { //传输后要做的事 super.stop(); } @Override public Status process() throws EventDeliveryException { Status status = null; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); //传输代码,主要的传输逻辑都在这里实现 txn.close(); return status; } } |
1 2 3 4 5 6 7 8 9 |
#自定义的sink类 agent.sinks.s1.type = cn.xxxxx.CustomSink agent.sinks.s1.channel = c1 #自定义的一些配置参数 agent.sinks.s1.appid = ${APPID} agent.sinks.s1.posturl = ${POST_URL} agent.sinks.s1.intervalDatetime = ${INTERVALDATETIME} agent.sinks.s1.fileFlag = false agent.sinks.s1.batchSize = ${BATCH} |
3、其他实现代码补充
sink:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
try { long processedEvent = 0; List<String> contents=new ArrayList<>(); for (; contents.size() < batchSize && (new Date().getTime() - pushDatetime)/1000 < intervalDatetime;) { Event event = ch.take(); if(event != null){ byte[] eventBody = event.getBody(); String line= new String(eventBody,"UTF-8"); if (line.length() > 0 ){ contents.add(line.trim()); logger.debug(processedEvent+"++++"+line); } }else{ logger.debug("null evet!!!!!!"+contents.size()); } logger.debug(this.batchSize+"***for***:"+contents.size()); } pushDatetime = new Date().getTime(); //数据传输 if(cosumeInfo(contents)){ } contents.clear(); status = Status.READY; txn.commit(); } catch (Exception e) { txn.rollback(); status = Status.BACKOFF; logger.error("channel was error.",e); } |
http:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
private synchronized boolean cosumeInfo(List<String> contents) { boolean flag = true; if(contents.size()<=0){ return flag; } StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("["); for(String item:contents){ stringBuffer.append(item.trim()+","); } stringBuffer.append("]"); if(defaultSender(stringBuffer.toString())){ flag = false; } return flag; } #http传输 private boolean defaultSender(String data) { if(file_flag){ writerToFile(data); } byte[] val1 = encodeRecord(data); if (val1 == null){ return false; } StringEntity params = new StringEntity(new String(val1),"UTF-8"); HttpPost httpPost = null; CloseableHttpResponse closeableHttpResponse = null; try{ httpPost = new HttpPost(this.posturl); httpPost.setEntity(params); httpPost.addHeader("appid", appid); httpPost.addHeader("user-agent", "0.1.0"); closeableHttpResponse = this.httpClient.execute(httpPost); if(closeableHttpResponse.getStatusLine().getStatusCode() != 200) { throw new IOException(EntityUtils.toString(closeableHttpResponse.getEntity())); }else{ logger.info("data send successful:"+data.length()); //EntityUtils.toString(new BufferedHttpEntity(closeableHttpResponse.getEntity())); } }catch(Exception e){ if(httpPost != null){ httpPost.abort(); } logger.error("Httpclient execute was error.",e); return false; }finally { if(closeableHttpResponse != null){ try { closeableHttpResponse.close(); } catch (IOException e) { logger.error("释放连接出错:",e); } } } return true; } #数据压缩 private byte[] encodeRecord(String data) { ByteArrayOutputStream byteArrayBuffer = new ByteArrayOutputStream(); try { GZIPOutputStream var2 = new GZIPOutputStream(byteArrayBuffer); var2.write(data.getBytes(StandardCharsets.UTF_8)); var2.close(); }catch(IOException var3) { logger.error("GZIP compress with exception", var3); return null; } return Base64.encodeBase64(byteArrayBuffer.toByteArray()); } |
- 我的微信
- 这是我的微信扫一扫
-
- 我的微信公众号
- 我的微信公众号扫一扫
-