Please find inline the configuration file
# list the sources, sinks and channels in the agentagent_foo.sources =
hdfs-Cluster1-sourceagent_foo.sinks = hdfs-Cluster1-sink1 hdfs-Cluster1-sink2
hdfs-Cluster1-sink3 hdfs-Cluster1-sink4agent_foo.channels = file-channel-1
file-channel-2 file-channel-3
# set channels for sourceagent_foo.sources.hdfs-Cluster1-source.channels =
file-channel-1 file-channel-2 file-channel-3
# Describe/configure the sourceagent_foo.sources.hdfs-Cluster1-source.type =
org.apache.flume.source.kafka.KafkaSource
agent_foo.sources.hdfs-Cluster1-source.zookeeperConnect =
10.32.22.67:2181agent_foo.sources.hdfs-Cluster1-source.topic =
test3agent_foo.sources.hdfs-Cluster1-source.groupId =
flumeagent_foo.sources.hdfs-Cluster1-source.kafka.auto.offset.reset = smallest
agent_foo.sources.hdfs-Cluster1-source.batchSize = 5
agent_foo.sources.hdfs-Cluster1-source.kafka.consumer.timeout.ms = 100
agent_foo.channels.file-channel-1.type =
fileagent_foo.channels.file-channel-1.checkpointDir =
/home/dheeraj.rokade/flume/.flume/file-channel1/checkpoint
agent_foo.channels.file-channel-1.backupCheckpointDir =
/home/dheeraj.rokade/flume/.flume/file-channel1/backupagent_foo.channels.file-channel-1.dataDirs
=
/home/dheeraj.rokade/flume/.flume/file-channel1/dataagent_foo.channels.file-channel-1.transactionCapacity
= 205agent_foo.channels.file-channel-1.checkpointInterval =
3000agent_foo.channels.file-channel-1.capacity =
10000agent_foo.channels.file-channel-1.keep-alive =
5agent_foo.channels.file-channel-1.use-fast-replay=false
agent_foo.channels.file-channel-2.type =
fileagent_foo.channels.file-channel-2.checkpointDir =
/home/dheeraj.rokade/flume/.flume/file-channel2/checkpoint
agent_foo.channels.file-channel-2.backupCheckpointDir =
/home/dheeraj.rokade/flume/.flume/file-channel2/backup
agent_foo.channels.file-channel-2.dataDirs =
/home/dheeraj.rokade/flume/.flume/file-channel2/data
agent_foo.channels.file-channel-2.transactionCapacity =
25agent_foo.channels.file-channel-2.checkpointInterval =
300agent_foo.channels.file-channel-2.capacity =
100agent_foo.channels.file-channel-2.keep-alive =
5agent_foo.channels.file-channel-2.use-fast-replay=false
agent_foo.channels.file-channel-3.type =
fileagent_foo.channels.file-channel-3.checkpointDir =
/home/dheeraj.rokade/flume/.flume/file-channel3/checkpoint
agent_foo.channels.file-channel-3.backupCheckpointDir =
/home/dheeraj.rokade/flume/.flume/file-channel3/backup
agent_foo.channels.file-channel-3.dataDirs =
/home/dheeraj.rokade/flume/.flume/file-channel3/dataagent_foo.channels.file-channel-3.transactionCapacity
= 25agent_foo.channels.file-channel-3.checkpointInterval =
300agent_foo.channels.file-channel-3.capacity =
100agent_foo.channels.file-channel-3.keep-alive =
5agent_foo.channels.file-channel-3.use-fast-replay=false
# set channel for sinksagent_foo.sinks.hdfs-Cluster1-sink1.channel =
file-channel-1agent_foo.sinks.hdfs-Cluster1-sink2.channel =
file-channel-2agent_foo.sinks.hdfs-Cluster1-sink3.channel =
file-channel-3agent_foo.sinks.hdfs-Cluster1-sink4.channel = file-channel-1
agent_foo.sources.hdfs-Cluster1-source.interceptors =
i1agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.type =
regex_extractoragent_foo.sources.hdfs-Cluster1-source.interceptors.i1.regex
=(\\w+)|(\\w+)|#agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.serializers.delimiter
= :#agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.regex =
(\\d).*agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.serializers = s1
s2agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.serializers.s1.type =
org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.serializers.s2.type =
org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.serializers.s1.name =
val1agent_foo.sources.hdfs-Cluster1-source.interceptors.i1.serializers.s2.name
= col2
# channel selector
configurationagent_foo.sources.hdfs-Cluster1-source.selector.type =
multiplexingagent_foo.sources.hdfs-Cluster1-source.selector.header = val1
agent_foo.sources.hdfs-Cluster1-source.selector.mapping.I =
file-channel-1agent_foo.sources.hdfs-Cluster1-source.selector.mapping.s_case =
file-channel-2agent_foo.sources.hdfs-Cluster1-source.selector.mapping.s_user =
file-channel-3agent_foo.sources.hdfs-Cluster1-source.selector.default =
file-channel-3
agent_foo.sinks.hdfs-Cluster1-sink1.type =
hdfsagent_foo.sinks.hdfs-Cluster1-sink1.hdfs.path =
hdfs://10.32.22.65:8020/user/dheeraj.rokade/flume_test1agent_foo.sinks.hdfs-Cluster1-sink1.rollInterval
=
10agent_foo.sinks.hdfs-Cluster1-sink1.hdfs.writeFormat=Textagent_foo.sinks.hdfs-Cluster1-sink1.hdfs.fileType=DataStreamagent_foo.sinks.hdfs-Cluster1-sink1.hdfs.batchSize
= 5
agent_foo.sinks.hdfs-Cluster1-sink2.type =
hdfsagent_foo.sinks.hdfs-Cluster1-sink2.hdfs.path =
hdfs://10.32.22.65:8020/user/dheeraj.rokade/flume_test2
agent_foo.sinks.hdfs-Cluster1-sink2.rollInterval =
10agent_foo.sinks.hdfs-Cluster1-sink2.hdfs.writeFormat=Textagent_foo.sinks.hdfs-Cluster1-sink2.hdfs.fileType=DataStreamagent_foo.sinks.hdfs-Cluster1-sink2.hdfs.batchSize
= 5
agent_foo.sinks.hdfs-Cluster1-sink3.type =
hdfsagent_foo.sinks.hdfs-Cluster1-sink3.hdfs.path =
hdfs://10.32.22.65:8020/user/dheeraj.rokade/flume_test3
agent_foo.sinks.hdfs-Cluster1-sink3.rollInterval =
10agent_foo.sinks.hdfs-Cluster1-sink3.hdfs.writeFormat=Textagent_foo.sinks.hdfs-Cluster1-sink3.hdfs.fileType=DataStreamagent_foo.sinks.hdfs-Cluster1-sink3.hdfs.batchSize
= 5
agent_foo.sinks.hdfs-Cluster1-sink4.type =
hdfsagent_foo.sinks.hdfs-Cluster1-sink4.hdfs.path =
hdfs://10.32.22.65:8020/user/dheeraj.rokade/flume_test4agent_foo.sinks.hdfs-Cluster1-sink4.rollInterval
=
10agent_foo.sinks.hdfs-Cluster1-sink4.hdfs.writeFormat=Textagent_foo.sinks.hdfs-Cluster1-sink4.hdfs.fileType=DataStreamagent_foo.sinks.hdfs-Cluster1-sink4.hdfs.batchSize
= 5
From: Balazs Donat Bessenyei <[email protected]>
To: [email protected]; "[email protected]" <[email protected]>
Cc: Attila Simon <[email protected]>
Sent: Monday, 17 October 2016 6:58 PM
Subject: Re: Flume channel error : Unable to put batch on required channel:
FileChannel file-channel-1
As far as I know, apache.org mailing lists "ignore" attachments.
You should either upload them to some service like
https://gist.github.com/ or inline the relevant parts.
Btw, when do you get these errors? When starting Flume or after some time?
Thank you,
Donat
On Mon, Oct 17, 2016 at 7:51 PM, Dheeraj Rokade
<[email protected]> wrote:
> Please find the configuration file attached
>
> Thanks
> Dheeraj
>
> Sent from Yahoo Mail on Android
>
> On Mon, 17 Oct, 2016 at 18:23, Attila Simon
> <[email protected]> wrote:
> Hi,
> It might've been my mobile client for gmail but couldn't find the flume
> conf attached. Could you please double check?
> Cheers,
> Attila
>
> On Monday, 17 October 2016, Dheeraj Rokade <[email protected]>
> wrote:
>
>> Hello Team,
>>
>> My development activity is stuck due to below error in the Apache agent.
>>
>> *org.apache.flume.ChannelException: Unable to put batch on required
>> channel: FileChannel file-channel-1 *
>>
>> I have the following configuration
>> One Source - *Kafka*
>> Three channels - *File channel*
>> Four Sinks - *HDFS Sinks*
>>
>> Attached is the flume agent config file and below is the error message
>>
>> Things tried
>> 1) Lowering or increasing the batch size for Kafka source and Hive HDFS
>> sink
>> 2) Lowering or increasing the TransactionCapacity and Capacity for File
>> channel
>> 3) With some help from the net, tried keeping File channel
>> TransactionCapacity to be 5x of Kafka and HDFS Batch size but in vain.
>>
>> Please help advise
>>
>> Error message in detail :
>> 16/10/17 18:06:08 ERROR kafka.KafkaSource: KafkaSource EXCEPTION, {}
>> org.apache.flume.ChannelException: Unable to put batch on required
>> channel: FileChannel file-channel-1 { dataDirs:
>> [/home/dheeraj.rokade/flume/.flume/file-channel1/data] }
>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(
>> ChannelProcessor.java:200)
>> at org.apache.flume.source.kafka.KafkaSource.process(
>> KafkaSource.java:123)
>> at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(
>> PollableSourceRunner.java:139)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>> at org.apache.flume.channel.file.proto.ProtosFactory$
>> FlumeEventHeader$Builder.setValue(ProtosFactory.java:7415)
>> at org.apache.flume.channel.file.Put.writeProtos(Put.java:85)
>> at org.apache.flume.channel.file.TransactionEventRecord.
>> toByteBuffer(TransactionEventRecord.java:174)
>> at org.apache.flume.channel.file.Log.put(Log.java:642)
>> at org.apache.flume.channel.file.FileChannel$
>> FileBackedTransaction.doPut(FileChannel.java:468)
>> at org.apache.flume.channel.BasicTransactionSemantics.put(
>> BasicTransactionSemantics.java:93)
>> at org.apache.flume.channel.BasicChannelSemantics.put(
>> BasicChannelSemantics.java:80)
>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(
>> ChannelProcessor.java:189)
>> ... 3 more
>> 16/10/17 18:06:13 ERROR kafka.KafkaSource: KafkaSource EXCEPTION, {}
>> org.apache.flume.ChannelException: Unable to put batch on required
>> channel: FileChannel file-channel-1 { dataDirs:
>> [/home/dheeraj.rokade/flume/.flume/file-channel1/data] }
>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(
>> ChannelProcessor.java:200)
>> at org.apache.flume.source.kafka.KafkaSource.process(
>> KafkaSource.java:123)
>> at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(
>> PollableSourceRunner.java:139)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>> at org.apache.flume.channel.file.proto.ProtosFactory$
>> FlumeEventHeader$Builder.setValue(ProtosFactory.java:7415)
>> at org.apache.flume.channel.file.Put.writeProtos(Put.java:85)
>> at org.apache.flume.channel.file.TransactionEventRecord.
>> toByteBuffer(TransactionEventRecord.java:174)
>> at org.apache.flume.channel.file.Log.put(Log.java:642)
>> at org.apache.flume.channel.file.FileChannel$
>> FileBackedTransaction.doPut(FileChannel.java:468)
>> at org.apache.flume.channel.BasicTransactionSemantics.put(
>> BasicTransactionSemantics.java:93)
>> at org.apache.flume.channel.BasicChannelSemantics.put(
>> BasicChannelSemantics.java:80)
>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(
>> ChannelProcessor.java:189)
>> ... 3 more
>
>>
>>
>
> --
>
> *Attila Simon*
> Software Engineer
> Email: [email protected]
>
> [image: Cloudera Inc.]
>