Hi, I have a structured streaming job which listens to a hdfs folder containing jsonl.gz files. The job crashed due to error:
java.io.IOException: incorrect header check at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method) at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Is there a way to skip the gz files that cannot be decompressed? Exception handling seems not help. The only workaround I can think of is to decompress the gz files into another folder first and make the spark streaming job listen to this new folder. But this workaround may not be better compared with the solution using a unstructured streaming job to directly decompress the gz file, read jsonl file, validate the records and write the validated records into parquet. Any idea is highly appreciated!