Hi,

I have a structured streaming job which listens to a hdfs folder containing
jsonl.gz files. The job crashed due to error:

java.io.IOException: incorrect header check
    at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
    at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
    at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
    at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
    at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
    at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
    at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
    at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
    at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
    at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


Is there a way to skip the gz files that cannot be decompressed? Exception
handling seems not help. The only workaround I can think of is to
decompress the gz files into another folder first and make the spark
streaming job listen to this new folder. But this workaround may not be
better compared with the solution using a unstructured streaming job to
directly decompress the gz file, read jsonl file, validate the records and
write the validated records into parquet.

Any idea is highly appreciated!

Reply via email to