prayagupd created FLUME-3040: -------------------------------- Summary: Flume client can not append events batch, deflate compression is failing Key: FLUME-3040 URL: https://issues.apache.org/jira/browse/FLUME-3040 Project: Flume Issue Type: Question Components: File Channel Affects Versions: v1.6.0 Environment: I'm using, jvm 1.8 and flume 1.6.
{code} java -version java version "1.8.0" Java(TM) SE Runtime Environment (build pxa6480sr3fp20-20161019_02(SR3 FP20)) IBM J9 VM (build 2.8, JRE 1.8.0 Linux amd64-64 Compressed References 20161013_322271 (JIT enabled, AOT enabled) J9VM - R28_Java8_SR3_20161013_1635_B322271 JIT - tr.r14.java.green_20161011_125790 GC - R28_Java8_SR3_20161013_1635_B322271_CMPRSS J9CL - 20161013_322271) JCL - 20161018_01 based on Oracle jdk8u111-b14 {code} Reporter: prayagupd I'm getting following Failed to send batch error at NettyAvroRpcClient#appendBatch {code} org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:785) Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373) ... 3 more Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from remote handler at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308) ... 4 more Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394) ... 6 more Caused by: java.lang.ArrayIndexOutOfBoundsException at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127) at org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637) at org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860) at org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724) at org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168) at org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601) at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140) at org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66) at org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322) at org.jboss.netty.channel.Channels.write(Channels.java:725) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) at org.jboss.netty.channel.Channels.write(Channels.java:704) at org.jboss.netty.channel.Channels.write(Channels.java:671) at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248) at org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515) at org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476) at org.apache.avro.ipc.Requestor.request(Requestor.java:147) at org.apache.avro.ipc.Requestor.request(Requestor.java:129) at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84) at com.sun.proxy.$Proxy29.appendBatch(Unknown Source) at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353) at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:277) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more {code} My flume config is, {code} client.sources=source_1 source_2 src-flume-log src-flume-metrics client.channels = file-channel-1 file-metrics-1 file-flumelog-1 client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1 #source definations client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource client.sources.source_1.filegroups = f1 client.sources.source_1.fileHeader = true client.sources.source_1.filegroups.f1 = /u01/app/Logs/microservice1/logging12.log client.sources.source_1.idleTimeout=3000 client.sources.source_1.batchSize=100 client.sources.source_1.channels=file-channel-1 client.sources.source_1.multiline=false client.sources.source_1.bufferSize=1048576 client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter client.sources.source_1.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService1.* client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1 client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource client.sources.source_2.filegroups = f1 client.sources.source_2.fileHeader = true client.sources.source_2.filegroups.f1 = /u01/app/Logs/microservice2/logging12.log client.sources.source_2.idleTimeout=3000 client.sources.source_2.batchSize=100 client.sources.source_2.channels=file-channel-1 client.sources.source_2.multiline=false client.sources.source_2.bufferSize=1048576 client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter client.sources.source_2.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService2.* client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2 client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource client.sources.src-flume-log.filegroups = f1 client.sources.src-flume-log.fileHeader = true client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log client.sources.src-flume-log.idleTimeout=3000 client.sources.src-flume-log.batchSize=100 client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json client.sources.src-flume-log.channels=file-flumelog-1 client.sources.src-flume-log.interceptors = i1 client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume client.sources.src-flume-metrics.type = exec client.sources.src-flume-metrics.command = while true ; do echo ; sleep 5; done client.sources.src-flume-metrics.shell = /bin/sh -c client.sources.src-flume-metrics.channels = file-metrics-1 client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json client.sources.src-flume-metrics.interceptors = i1 i2 client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder client.sources.src-flume-metrics.interceptors.i1.keyList = appCategory|appName|logEnv|logType client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder #channel definations client.channels.file-channel-1.type=file client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1 client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1 client.channels.file-channel-1.checkpointOnClose=true client.channels.file-metrics-1.type=file client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1 client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1 client.channels.file-metrics-1.checkpointOnClose=true client.channels.file-flumelog-1.type=file client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1 client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1 client.channels.file-flumelog-1.checkpointOnClose=true #sink definations client.sinks.avro-sink-1.type=avro client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-sink-1.port=443 client.sinks.avro-sink-1.compression-type=deflate client.sinks.avro-sink-1.channel=file-channel-1 client.sinks.avro-sink-1.ssl=false client.sinks.avro-sink-1.trust-all-certs=false client.sinks.avro-metrics-sink-1.type=avro client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-metrics-sink-1.port=443 client.sinks.avro-metrics-sink-1.compression-type=deflate client.sinks.avro-metrics-sink-1.channel=file-metrics-1 client.sinks.avro-metrics-sink-1.ssl=false client.sinks.avro-metrics-sink-1.trust-all-certs=false client.sinks.avro-flumelog-sink-1.type=avro client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net client.sinks.avro-flumelog-sink-1.port=443 client.sinks.avro-flumelog-sink-1.compression-type=deflate client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1 client.sinks.avro-flumelog-sink-1.ssl=false client.sinks.avro-flumelog-sink-1.trust-all-certs=false {code} It actually works for a hours or so, but then the same failure to appendBatch issue, which seems to be an compression issue. At this time, I don't see anything in queueset, {code} ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/ total 0 cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta 0 4??Y?%?Z* * cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes J?6?K??y?u.#H? cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs J?6?K??y?u.#H? {code} The end of filechannel1 dataDir log-2 and log-3 are all filled up by ?????? {code} ??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)