prayagupd created FLUME-3040:
--------------------------------

             Summary: Flume client can not append events batch, deflate 
compression is failing
                 Key: FLUME-3040
                 URL: https://issues.apache.org/jira/browse/FLUME-3040
             Project: Flume
          Issue Type: Question
          Components: File Channel
    Affects Versions: v1.6.0
         Environment: I'm using, jvm 1.8 and flume 1.6.

{code}
java -version
java version "1.8.0"
Java(TM) SE Runtime Environment (build pxa6480sr3fp20-20161019_02(SR3 FP20))
IBM J9 VM (build 2.8, JRE 1.8.0 Linux amd64-64 Compressed References 
20161013_322271 (JIT enabled, AOT enabled)
J9VM - R28_Java8_SR3_20161013_1635_B322271
JIT  - tr.r14.java.green_20161011_125790
GC   - R28_Java8_SR3_20161013_1635_B322271_CMPRSS
J9CL - 20161013_322271)
JCL - 20161018_01 based on Oracle jdk8u111-b14
{code}
            Reporter: prayagupd


I'm getting following Failed to send batch error at 
NettyAvroRpcClient#appendBatch

{code}
org.apache.flume.EventDeliveryException: Failed to send events
        at 
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:785)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { 
logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch
        at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320)
        at 
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 
logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from remote 
handler
        at 
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402)
        at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379)
        at 
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308)
        ... 4 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException
        at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
        at 
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394)
        ... 6 more
Caused by: java.lang.ArrayIndexOutOfBoundsException
        at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127)
        at 
org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637)
        at 
org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860)
        at 
org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724)
        at 
org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168)
        at 
org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601)
        at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140)
        at 
org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
        at 
org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322)
        at org.jboss.netty.channel.Channels.write(Channels.java:725)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
        at org.jboss.netty.channel.Channels.write(Channels.java:704)
        at org.jboss.netty.channel.Channels.write(Channels.java:671)
        at 
org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248)
        at 
org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515)
        at 
org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476)
        at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
        at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
        at 
org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84)
        at com.sun.proxy.$Proxy29.appendBatch(Unknown Source)
        at 
org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353)
        at 
org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:277)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
{code}

My flume config is,

{code}
client.sources=source_1 source_2 src-flume-log src-flume-metrics 
client.channels = file-channel-1 file-metrics-1 file-flumelog-1
client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1

#source definations
client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource
client.sources.source_1.filegroups = f1
client.sources.source_1.fileHeader = true
client.sources.source_1.filegroups.f1 = 
/u01/app/Logs/microservice1/logging12.log
client.sources.source_1.idleTimeout=3000
client.sources.source_1.batchSize=100
client.sources.source_1.channels=file-channel-1
client.sources.source_1.multiline=false
client.sources.source_1.bufferSize=1048576
client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json
client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher 
logEnricher removeEmptyElems addFields
client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter
client.sources.source_1.interceptors.filterLogsWithAppId.regex = 
.*applicationId=MicroService1.*
client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false
client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder
client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder
client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder
client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder
client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType
client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1

client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource
client.sources.source_2.filegroups = f1
client.sources.source_2.fileHeader = true
client.sources.source_2.filegroups.f1 = 
/u01/app/Logs/microservice2/logging12.log
client.sources.source_2.idleTimeout=3000
client.sources.source_2.batchSize=100
client.sources.source_2.channels=file-channel-1
client.sources.source_2.multiline=false
client.sources.source_2.bufferSize=1048576
client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json
client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher 
logEnricher removeEmptyElems addFields
client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter
client.sources.source_2.interceptors.filterLogsWithAppId.regex = 
.*applicationId=MicroService2.*
client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false
client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder
client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder
client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder
client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder
client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType
client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2

client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource
client.sources.src-flume-log.filegroups = f1
client.sources.src-flume-log.fileHeader = true
client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log
client.sources.src-flume-log.idleTimeout=3000
client.sources.src-flume-log.batchSize=100
client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json
client.sources.src-flume-log.channels=file-flumelog-1
client.sources.src-flume-log.interceptors = i1 
client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder
client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType
client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume

client.sources.src-flume-metrics.type = exec
client.sources.src-flume-metrics.command = while true ; do echo  ; sleep 5; done
client.sources.src-flume-metrics.shell = /bin/sh -c
client.sources.src-flume-metrics.channels = file-metrics-1
client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json
client.sources.src-flume-metrics.interceptors = i1 i2
client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder
client.sources.src-flume-metrics.interceptors.i1.keyList = 
appCategory|appName|logEnv|logType
client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics
client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder

#channel definations

client.channels.file-channel-1.type=file
client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1
client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1
client.channels.file-channel-1.checkpointOnClose=true

client.channels.file-metrics-1.type=file
client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1
client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1
client.channels.file-metrics-1.checkpointOnClose=true

client.channels.file-flumelog-1.type=file
client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1
client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1
client.channels.file-flumelog-1.checkpointOnClose=true

#sink definations

client.sinks.avro-sink-1.type=avro
client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net
client.sinks.avro-sink-1.port=443
client.sinks.avro-sink-1.compression-type=deflate
client.sinks.avro-sink-1.channel=file-channel-1
client.sinks.avro-sink-1.ssl=false
client.sinks.avro-sink-1.trust-all-certs=false

client.sinks.avro-metrics-sink-1.type=avro
client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net
client.sinks.avro-metrics-sink-1.port=443
client.sinks.avro-metrics-sink-1.compression-type=deflate
client.sinks.avro-metrics-sink-1.channel=file-metrics-1
client.sinks.avro-metrics-sink-1.ssl=false
client.sinks.avro-metrics-sink-1.trust-all-certs=false

client.sinks.avro-flumelog-sink-1.type=avro
client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net
client.sinks.avro-flumelog-sink-1.port=443
client.sinks.avro-flumelog-sink-1.compression-type=deflate
client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1
client.sinks.avro-flumelog-sink-1.ssl=false
client.sinks.avro-flumelog-sink-1.trust-all-certs=false
{code}


It actually works for a hours or so, but then the same failure to appendBatch 
issue, which seems to be an compression issue.

At this time, I don't see anything in queueset,

{code}

ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/
total 0

cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta 
0
4??Y?%?Z*
*

cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes 
J?6?K??y?u.#H?

cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs 
J?6?K??y?u.#H?
{code}

The end of filechannel1 dataDir log-2 and log-3 are all filled up by ??????

{code}
???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to