Flume 是一种可扩展的、可靠的分布式日志收集和聚合系统。它使用了拦截器来对数据流进行处理和过滤,以满足不同的需求。Flume 提供了很多内置的拦截器,但我们也可以通过自定义拦截器来实现自定义的数据过滤和处理。
一、引入依赖
新建一个工程,引入如下依赖:
1 2 3 4 5
| <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.11.0</version> </dependency>
|
二、自定义拦截器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package com.bda.dcp.flume.interceptor;
import com.google.common.base.Splitter; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
@Slf4j public class JsonInterceptor implements Interceptor {
private String jsonTopic;
public JsonInterceptor(String jsonTopic) { this.jsonTopic = jsonTopic; System.out.println("======> keyword = " + jsonTopic); }
@Override public void initialize() {
}
@Override public Event intercept(Event event) { try { String body = new String(event.getBody());
Gson gson = new Gson(); Map<String, String> bodyJson = gson.fromJson(body, Map.class); log.info("=====> body json: {}", bodyJson);
List<String> columns = new ArrayList<>(); if (StringUtils.isNotBlank(jsonTopic)) { Iterable<String> split = Splitter.on(",") .omitEmptyStrings() .trimResults() .split(jsonTopic); split.forEach(s -> columns.add(s)); } Map<String, String> result = new HashMap<>(); for (String key : bodyJson.keySet()) { if (columns.contains(key)) { result.put(key, bodyJson.get(key)); } }
event.setBody(gson.toJson(result).getBytes()); return event; } catch (Exception e) { log.error(e.getMessage(), e); return null; } }
@Override public List<Event> intercept(List<Event> list) { List<Event> resultList = Lists.newArrayList(); for (Event event : list) { Event result = intercept(event); if (result != null) { resultList.add(result); } } return resultList; }
@Override public void close() {
}
public static class Builder implements Interceptor.Builder {
private String jsonTopic;
@Override public Interceptor build() { return new JsonInterceptor(jsonTopic); }
@Override public void configure(Context context) { jsonTopic = context.getString("jsonTopic"); } }
}
|
三、打包
mvn package 打包成 jar 包后放入 flume/lib/ 目录
四、配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| a1.sources = r1 a1.sinks = k1 a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.channels = c1 a1.sources.r1.batchSize = 3 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = 192.162.11.25:9092 a1.sources.r1.kafka.topics = flume-source-kafka a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = topic a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.value = flume-collect-kafka
a1.sources.r1.interceptors.i2.type = com.bda.dcp.flume.interceptor.JsonInterceptor$Builder
a1.sources.r1.interceptors.i2.jsonTopic=name,age
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.flumeBatchSize = 3 a1.sinks.k1.kafka.bootstrap.servers = 192.162.11.191:9092 a1.sinks.k1.kafka.topic = flume-collect-kafka a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = 192.162.11.25:9092 a1.channels.c1.kafka.topic = flume-channel a1.channels.c1.kafka.consumer.auto.offset.reset = latest
|