Hi, In your spark-submit command, try using the below config property and see if this solves the problem.
--conf spark.sql.files.ignoreCorruptFiles=true For me this worked to ignore reading empty/partially uploaded gzip files in s3 bucket. Akshay Bhardwaj +91-97111-33849 On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang <jiangok2...@gmail.com> wrote: > Hi, > > I have a structured streaming job which listens to a hdfs folder > containing jsonl.gz files. The job crashed due to error: > > java.io.IOException: incorrect header check > at > org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native > Method) > at > org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225) > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182) > at > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176) > at > org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152) > at > org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > Is there a way to skip the gz files that cannot be decompressed? Exception > handling seems not help. The only workaround I can think of is to > decompress the gz files into another folder first and make the spark > streaming job listen to this new folder. But this workaround may not be > better compared with the solution using a unstructured streaming job to > directly decompress the gz file, read jsonl file, validate the records and > write the validated records into parquet. > > Any idea is highly appreciated! > > > > >