[ https://issues.apache.org/jira/browse/BEAM-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Cwik resolved BEAM-2373. ----------------------------- Resolution: Fixed > AvroSource: Premature End of stream Exception on SnappyCompressorInputStream > ---------------------------------------------------------------------------- > > Key: BEAM-2373 > URL: https://issues.apache.org/jira/browse/BEAM-2373 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.0.0 > Reporter: Michael Luckey > Assignee: Michael Luckey > Priority: Critical > Fix For: 2.1.0 > > > During processing we encountered on some of our snappy encoded avro input > files > {noformat} > Exception in thread "main" java.lang.RuntimeException: java.io.IOException: > Premature end of stream > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:330) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) > Caused by: java.io.IOException: Premature end of stream > at > org.apache.beam.sdk.repackaged.org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream.expandLiteral(SnappyCompressorInputStream.java:310) > at > org.apache.beam.sdk.repackaged.org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream.fill(SnappyCompressorInputStream.java:169) > at > org.apache.beam.sdk.repackaged.org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream.read(SnappyCompressorInputStream.java:134) > at > org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839) > at > org.apache.avro.io.BinaryDecoder$ByteSource.compactAndFill(BinaryDecoder.java:692) > at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:471) > at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) > at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423) > at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) > at > org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:579) > at > org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:198) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:479) > at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:277) > at > org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:148) > at > org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) > at > org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This seems to be caused by a bug in apache.commons.compress:1.9, which was > addressed here: > https://github.com/apache/commons-compress/commit/9ae37525134089dd0c9ee1cf8738192b70e0fc07 > Used a pipeline using AvroIO, on spark and direct, both on hdfs and local > file system. > In our short tests we got it running without exceptions by either: > * upgrading to commons.compress:1.14 > * applying the patch to the 1.9er code of SnappyCompressorInputStream > Impacts on other components were not tested, of course :( -- This message was sent by Atlassian JIRA (v6.4.14#64029)