I have a question: how to set two interceptors to a channel?

Here is my configuration:

# configuration start
tier1.sources.KafkaSource.channels = c1 c2
tier1.sources.KafkaSource.interceptors = i1 i2
tier1.sources.KafkaSource.interceptors.i1.type = host
tier1.sources.KafkaSource.interceptors.i1.useIP = true

tier1.sources.KafkaSource.interceptors.i2.type = 
org.apache.flume.interceptor.My$Builder
tier1.sources.KafkaSource.interceptors.i2.key = logType
tier1.sources.KafkaSource.interceptors.i2.value = app

tier1.sources.KafkaSource.selector.type = multiplexing
tier1.sources.KafkaSource.selector.header = logType
tier1.sources.KafkaSource.selector.mapping.app = c1
tier1.sources.KafkaSource.selector.default = c2



The Json format source: KafkSource will be processed by the interceptor: 
My$Builder, which will divide the source data to two part - the one with 
logType=app and another’s logType != app.

for example:

{“logType”:”app”,”id”:1,"env”:”test"}
{“logType”:”phone”,”id”:2,}
{“logType”:”phone”,”id”:3,”env”:”test"}

the “id”=1 will be trans to channel c1 and the id=2 and id=3 will be sent to 
channel c2.

So my question is: what a new interceptor can be set to make the record with 
logType=app and env=test be sent to channel c2?

Here is my thought, append the below interceptor:

tier1.sources.KafkaSource.interceptors.i3.type = 
org.apache.flume.interceptor.Myr$Builder
tier1.sources.KafkaSource.interceptors.i3.key = env
tier1.sources.KafkaSource.interceptors.i3.value = test

But it still make the record with id=2 be sent to channel c2 too.

Please help me figure out.

I expect the interceptor i3 only consume a part of i2’s output with logType=app.

Thanks 

I 

Reply via email to