[ https://issues.apache.org/jira/browse/FLUME-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
prayagupd updated FLUME-3040: ----------------------------- Description: I'm getting following Failed to send batch error at NettyAvroRpcClient#appendBatch, https://github.com/apache/flume/blob/d9c9a7dd9a6889ecf6b9dc88fb8e02ccc1cd5167/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java#L314 {code} org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:785) Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373) ... 3 more Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from remote handler at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308) ... 4 more Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394) ... 6 more Caused by: java.lang.ArrayIndexOutOfBoundsException at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127) at org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637) at org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860) at org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724) at org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168) at org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601) at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140) at org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66) at org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322) at org.jboss.netty.channel.Channels.write(Channels.java:725) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) at org.jboss.netty.channel.Channels.write(Channels.java:704) at org.jboss.netty.channel.Channels.write(Channels.java:671) at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248) at org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515) at org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476) at org.apache.avro.ipc.Requestor.request(Requestor.java:147) at org.apache.avro.ipc.Requestor.request(Requestor.java:129) at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84) at com.sun.proxy.$Proxy29.appendBatch(Unknown Source) at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353) at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:277) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more {code} My flume config is, {code} client.sources=source_1 source_2 src-flume-log src-flume-metrics client.channels = file-channel-1 file-metrics-1 file-flumelog-1 client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1 #source definations client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource client.sources.source_1.filegroups = f1 client.sources.source_1.fileHeader = true client.sources.source_1.filegroups.f1 = /u01/app/Logs/microservice1/logging12.log client.sources.source_1.idleTimeout=3000 client.sources.source_1.batchSize=100 client.sources.source_1.channels=file-channel-1 client.sources.source_1.multiline=false client.sources.source_1.bufferSize=1048576 client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter client.sources.source_1.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService1.* client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1 client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource client.sources.source_2.filegroups = f1 client.sources.source_2.fileHeader = true client.sources.source_2.filegroups.f1 = /u01/app/Logs/microservice2/logging12.log client.sources.source_2.idleTimeout=3000 client.sources.source_2.batchSize=100 client.sources.source_2.channels=file-channel-1 client.sources.source_2.multiline=false client.sources.source_2.bufferSize=1048576 client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter client.sources.source_2.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService2.* client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2 client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource client.sources.src-flume-log.filegroups = f1 client.sources.src-flume-log.fileHeader = true client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log client.sources.src-flume-log.idleTimeout=3000 client.sources.src-flume-log.batchSize=100 client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json client.sources.src-flume-log.channels=file-flumelog-1 client.sources.src-flume-log.interceptors = i1 client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume client.sources.src-flume-metrics.type = exec client.sources.src-flume-metrics.command = while true ; do echo ; sleep 5; done client.sources.src-flume-metrics.shell = /bin/sh -c client.sources.src-flume-metrics.channels = file-metrics-1 client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json client.sources.src-flume-metrics.interceptors = i1 i2 client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder client.sources.src-flume-metrics.interceptors.i1.keyList = appCategory|appName|logEnv|logType client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder #channel definations client.channels.file-channel-1.type=file client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1 client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1 client.channels.file-channel-1.checkpointOnClose=true client.channels.file-metrics-1.type=file client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1 client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1 client.channels.file-metrics-1.checkpointOnClose=true client.channels.file-flumelog-1.type=file client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1 client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1 client.channels.file-flumelog-1.checkpointOnClose=true #sink definations client.sinks.avro-sink-1.type=avro client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-sink-1.port=443 client.sinks.avro-sink-1.compression-type=deflate client.sinks.avro-sink-1.channel=file-channel-1 client.sinks.avro-sink-1.ssl=false client.sinks.avro-sink-1.trust-all-certs=false client.sinks.avro-metrics-sink-1.type=avro client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-metrics-sink-1.port=443 client.sinks.avro-metrics-sink-1.compression-type=deflate client.sinks.avro-metrics-sink-1.channel=file-metrics-1 client.sinks.avro-metrics-sink-1.ssl=false client.sinks.avro-metrics-sink-1.trust-all-certs=false client.sinks.avro-flumelog-sink-1.type=avro client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-flumelog-sink-1.port=443 client.sinks.avro-flumelog-sink-1.compression-type=deflate client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1 client.sinks.avro-flumelog-sink-1.ssl=false client.sinks.avro-flumelog-sink-1.trust-all-certs=false {code} It actually works for a hours or so, but then the same failure to appendBatch issue, which seems to be an compression issue. At this time, I don't see anything in queueset, {code} ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/ total 0 cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta 0 4??Y?%?Z* * cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes J?6?K??y?u.#H? cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs J?6?K??y?u.#H? {code} The end of filechannel1 dataDir log-2 and log-3 are all filled up by ?????? {code}{code} was: I'm getting following Failed to send batch error at NettyAvroRpcClient#appendBatch, {code} org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:785) Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373) ... 3 more Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from remote handler at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308) ... 4 more Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394) ... 6 more Caused by: java.lang.ArrayIndexOutOfBoundsException at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127) at org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637) at org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860) at org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724) at org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168) at org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601) at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140) at org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66) at org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322) at org.jboss.netty.channel.Channels.write(Channels.java:725) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) at org.jboss.netty.channel.Channels.write(Channels.java:704) at org.jboss.netty.channel.Channels.write(Channels.java:671) at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248) at org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515) at org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476) at org.apache.avro.ipc.Requestor.request(Requestor.java:147) at org.apache.avro.ipc.Requestor.request(Requestor.java:129) at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84) at com.sun.proxy.$Proxy29.appendBatch(Unknown Source) at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353) at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:277) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more {code} My flume config is, {code} client.sources=source_1 source_2 src-flume-log src-flume-metrics client.channels = file-channel-1 file-metrics-1 file-flumelog-1 client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1 #source definations client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource client.sources.source_1.filegroups = f1 client.sources.source_1.fileHeader = true client.sources.source_1.filegroups.f1 = /u01/app/Logs/microservice1/logging12.log client.sources.source_1.idleTimeout=3000 client.sources.source_1.batchSize=100 client.sources.source_1.channels=file-channel-1 client.sources.source_1.multiline=false client.sources.source_1.bufferSize=1048576 client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter client.sources.source_1.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService1.* client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1 client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource client.sources.source_2.filegroups = f1 client.sources.source_2.fileHeader = true client.sources.source_2.filegroups.f1 = /u01/app/Logs/microservice2/logging12.log client.sources.source_2.idleTimeout=3000 client.sources.source_2.batchSize=100 client.sources.source_2.channels=file-channel-1 client.sources.source_2.multiline=false client.sources.source_2.bufferSize=1048576 client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter client.sources.source_2.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService2.* client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2 client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource client.sources.src-flume-log.filegroups = f1 client.sources.src-flume-log.fileHeader = true client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log client.sources.src-flume-log.idleTimeout=3000 client.sources.src-flume-log.batchSize=100 client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json client.sources.src-flume-log.channels=file-flumelog-1 client.sources.src-flume-log.interceptors = i1 client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume client.sources.src-flume-metrics.type = exec client.sources.src-flume-metrics.command = while true ; do echo ; sleep 5; done client.sources.src-flume-metrics.shell = /bin/sh -c client.sources.src-flume-metrics.channels = file-metrics-1 client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json client.sources.src-flume-metrics.interceptors = i1 i2 client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder client.sources.src-flume-metrics.interceptors.i1.keyList = appCategory|appName|logEnv|logType client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder #channel definations client.channels.file-channel-1.type=file client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1 client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1 client.channels.file-channel-1.checkpointOnClose=true client.channels.file-metrics-1.type=file client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1 client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1 client.channels.file-metrics-1.checkpointOnClose=true client.channels.file-flumelog-1.type=file client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1 client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1 client.channels.file-flumelog-1.checkpointOnClose=true #sink definations client.sinks.avro-sink-1.type=avro client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-sink-1.port=443 client.sinks.avro-sink-1.compression-type=deflate client.sinks.avro-sink-1.channel=file-channel-1 client.sinks.avro-sink-1.ssl=false client.sinks.avro-sink-1.trust-all-certs=false client.sinks.avro-metrics-sink-1.type=avro client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-metrics-sink-1.port=443 client.sinks.avro-metrics-sink-1.compression-type=deflate client.sinks.avro-metrics-sink-1.channel=file-metrics-1 client.sinks.avro-metrics-sink-1.ssl=false client.sinks.avro-metrics-sink-1.trust-all-certs=false client.sinks.avro-flumelog-sink-1.type=avro client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-flumelog-sink-1.port=443 client.sinks.avro-flumelog-sink-1.compression-type=deflate client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1 client.sinks.avro-flumelog-sink-1.ssl=false client.sinks.avro-flumelog-sink-1.trust-all-certs=false {code} It actually works for a hours or so, but then the same failure to appendBatch issue, which seems to be an compression issue. At this time, I don't see anything in queueset, {code} ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/ total 0 cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta 0 4??Y?%?Z* * cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes J?6?K??y?u.#H? cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs J?6?K??y?u.#H? {code} The end of filechannel1 dataDir log-2 and log-3 are all filled up by ?????? {code}{code} > Flume client can not append events batch, deflate compression is failing > ------------------------------------------------------------------------ > > Key: FLUME-3040 > URL: https://issues.apache.org/jira/browse/FLUME-3040 > Project: Flume > Issue Type: Question > Components: File Channel > Affects Versions: v1.6.0 > Environment: jvm 1.8, flume 1.6. > {code} > java -version > java version "1.8.0" > Java(TM) SE Runtime Environment (build pxa6480sr3fp20-20161019_02(SR3 FP20)) > IBM J9 VM (build 2.8, JRE 1.8.0 Linux amd64-64 Compressed References > 20161013_322271 (JIT enabled, AOT enabled) > J9VM - R28_Java8_SR3_20161013_1635_B322271 > JIT - tr.r14.java.green_20161011_125790 > GC - R28_Java8_SR3_20161013_1635_B322271_CMPRSS > J9CL - 20161013_322271) > JCL - 20161018_01 based on Oracle jdk8u111-b14 > {code} > Reporter: prayagupd > > I'm getting following Failed to send batch error at > NettyAvroRpcClient#appendBatch, > https://github.com/apache/flume/blob/d9c9a7dd9a6889ecf6b9dc88fb8e02ccc1cd5167/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java#L314 > {code} > org.apache.flume.EventDeliveryException: Failed to send events > at > org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:785) > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { > logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320) > at > org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373) > ... 3 more > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { > host: logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from > remote handler > at > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402) > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379) > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308) > ... 4 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.ArrayIndexOutOfBoundsException > at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) > at > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394) > ... 6 more > Caused by: java.lang.ArrayIndexOutOfBoundsException > at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127) > at > org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637) > at > org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860) > at > org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724) > at > org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168) > at > org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601) > at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140) > at > org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66) > at > org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) > at > org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322) > at org.jboss.netty.channel.Channels.write(Channels.java:725) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) > at org.jboss.netty.channel.Channels.write(Channels.java:704) > at org.jboss.netty.channel.Channels.write(Channels.java:671) > at > org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248) > at > org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515) > at > org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476) > at org.apache.avro.ipc.Requestor.request(Requestor.java:147) > at org.apache.avro.ipc.Requestor.request(Requestor.java:129) > at > org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84) > at com.sun.proxy.$Proxy29.appendBatch(Unknown Source) > at > org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353) > at > org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1) > at java.util.concurrent.FutureTask.run(FutureTask.java:277) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > {code} > My flume config is, > {code} > client.sources=source_1 source_2 src-flume-log src-flume-metrics > client.channels = file-channel-1 file-metrics-1 file-flumelog-1 > client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1 > #source definations > client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource > client.sources.source_1.filegroups = f1 > client.sources.source_1.fileHeader = true > client.sources.source_1.filegroups.f1 = > /u01/app/Logs/microservice1/logging12.log > client.sources.source_1.idleTimeout=3000 > client.sources.source_1.batchSize=100 > client.sources.source_1.channels=file-channel-1 > client.sources.source_1.multiline=false > client.sources.source_1.bufferSize=1048576 > client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json > client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher > logEnricher removeEmptyElems addFields > client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter > client.sources.source_1.interceptors.filterLogsWithAppId.regex = > .*applicationId=MicroService1.* > client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false > client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder > client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder > client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder > client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder > client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType > client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1 > client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource > client.sources.source_2.filegroups = f1 > client.sources.source_2.fileHeader = true > client.sources.source_2.filegroups.f1 = > /u01/app/Logs/microservice2/logging12.log > client.sources.source_2.idleTimeout=3000 > client.sources.source_2.batchSize=100 > client.sources.source_2.channels=file-channel-1 > client.sources.source_2.multiline=false > client.sources.source_2.bufferSize=1048576 > client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json > client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher > logEnricher removeEmptyElems addFields > client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter > client.sources.source_2.interceptors.filterLogsWithAppId.regex = > .*applicationId=MicroService2.* > client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false > client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder > client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder > client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder > client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder > client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType > client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2 > client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource > client.sources.src-flume-log.filegroups = f1 > client.sources.src-flume-log.fileHeader = true > client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log > client.sources.src-flume-log.idleTimeout=3000 > client.sources.src-flume-log.batchSize=100 > client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json > client.sources.src-flume-log.channels=file-flumelog-1 > client.sources.src-flume-log.interceptors = i1 > client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder > client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType > client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume > client.sources.src-flume-metrics.type = exec > client.sources.src-flume-metrics.command = while true ; do echo ; sleep 5; > done > client.sources.src-flume-metrics.shell = /bin/sh -c > client.sources.src-flume-metrics.channels = file-metrics-1 > client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json > client.sources.src-flume-metrics.interceptors = i1 i2 > client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder > client.sources.src-flume-metrics.interceptors.i1.keyList = > appCategory|appName|logEnv|logType > client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics > client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder > #channel definations > client.channels.file-channel-1.type=file > client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1 > client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1 > client.channels.file-channel-1.checkpointOnClose=true > client.channels.file-metrics-1.type=file > client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1 > client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1 > client.channels.file-metrics-1.checkpointOnClose=true > client.channels.file-flumelog-1.type=file > client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1 > client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1 > client.channels.file-flumelog-1.checkpointOnClose=true > #sink definations > client.sinks.avro-sink-1.type=avro > client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net > client.sinks.avro-sink-1.port=443 > client.sinks.avro-sink-1.compression-type=deflate > client.sinks.avro-sink-1.channel=file-channel-1 > client.sinks.avro-sink-1.ssl=false > client.sinks.avro-sink-1.trust-all-certs=false > client.sinks.avro-metrics-sink-1.type=avro > client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net > client.sinks.avro-metrics-sink-1.port=443 > client.sinks.avro-metrics-sink-1.compression-type=deflate > client.sinks.avro-metrics-sink-1.channel=file-metrics-1 > client.sinks.avro-metrics-sink-1.ssl=false > client.sinks.avro-metrics-sink-1.trust-all-certs=false > client.sinks.avro-flumelog-sink-1.type=avro > client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net > client.sinks.avro-flumelog-sink-1.port=443 > client.sinks.avro-flumelog-sink-1.compression-type=deflate > client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1 > client.sinks.avro-flumelog-sink-1.ssl=false > client.sinks.avro-flumelog-sink-1.trust-all-certs=false > {code} > It actually works for a hours or so, but then the same failure to appendBatch > issue, which seems to be an compression issue. > At this time, I don't see anything in queueset, > {code} > ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/ > total 0 > cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta > 0 > 4??Y?%?Z* > * > cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes > J?6?K??y?u.#H? > cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs > J?6?K??y?u.#H? > {code} > The end of filechannel1 dataDir log-2 and log-3 are all filled up by ?????? > {code}{code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)