Flume自定义过滤器插件

冯旭光 Lv4

Flume 是一种可扩展的、可靠的分布式日志收集和聚合系统。它使用了拦截器来对数据流进行处理和过滤,以满足不同的需求。Flume 提供了很多内置的拦截器,但我们也可以通过自定义拦截器来实现自定义的数据过滤和处理。

一、引入依赖

新建一个工程,引入如下依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>

二、自定义拦截器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.bda.dcp.flume.interceptor;

import com.google.common.base.Splitter;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*
* Created by fengxuguang on 2024/6/14 15:32
*/
@Slf4j
public class JsonInterceptor implements Interceptor {

private String jsonTopic;

public JsonInterceptor(String jsonTopic) {
this.jsonTopic = jsonTopic;
System.out.println("======> keyword = " + jsonTopic);
}

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
try {
String body = new String(event.getBody());

Gson gson = new Gson();
Map<String, String> bodyJson = gson.fromJson(body, Map.class);
log.info("=====> body json: {}", bodyJson);

// 根据逗号分割配置的字符串,转换成 List<String> 集合
List<String> columns = new ArrayList<>();
if (StringUtils.isNotBlank(jsonTopic)) {
Iterable<String> split = Splitter.on(",")
.omitEmptyStrings()
.trimResults()
.split(jsonTopic);
split.forEach(s -> columns.add(s));
}
Map<String, String> result = new HashMap<>();
for (String key : bodyJson.keySet()) {
// 过滤出事件 body 内的数据在配置项中指定的字段
if (columns.contains(key)) {
result.put(key, bodyJson.get(key));
}
}

event.setBody(gson.toJson(result).getBytes());
return event;
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}

@Override
public List<Event> intercept(List<Event> list) {
List<Event> resultList = Lists.newArrayList();
for (Event event : list) {
Event result = intercept(event);
if (result != null) {
resultList.add(result);
}
}
return resultList;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {

private String jsonTopic;

@Override
public Interceptor build() {
return new JsonInterceptor(jsonTopic);
}

@Override
public void configure(Context context) {
jsonTopic = context.getString("jsonTopic");
}
}

}

三、打包

mvn package 打包成 jar 包后放入 flume/lib/ 目录

四、配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 3
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.162.11.25:9092
a1.sources.r1.kafka.topics = flume-source-kafka
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.value = flume-collect-kafka
# 配置自定义拦截器,类型必须是: 类全名$内部类名
a1.sources.r1.interceptors.i2.type = com.bda.dcp.flume.interceptor.JsonInterceptor$Builder
# 配置要过滤的字段
a1.sources.r1.interceptors.i2.jsonTopic=name,age

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.flumeBatchSize = 3
a1.sinks.k1.kafka.bootstrap.servers = 192.162.11.191:9092
a1.sinks.k1.kafka.topic = flume-collect-kafka
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.162.11.25:9092
a1.channels.c1.kafka.topic = flume-channel
a1.channels.c1.kafka.consumer.auto.offset.reset = latest
  • 标题: Flume自定义过滤器插件
  • 作者: 冯旭光
  • 创建于 : 2024-06-18 15:08:39
  • 更新于 : 2025-03-11 14:43:45
  • 链接: https://blog.fengxuguang.top/posts/3e52edfc/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论