Hi, Does anyone have idea how to fix this error? We are coming data from the Kafka topic having 105 partitions using spark structure streaming. Every hour 5-6 batches are getting failed due to this.
I couldn’t find any solution anywhere. 22/05/05 10:37:01 INFO impl.PhysicalFsWriter: ORC writer created for path: /dl/mb/hh/raw/streaming/topic=gh***_l**_c/datetime=202205051000/part-00080-a38c9fc9-79c7-4c1d-abfc-ac53aef8b089.c000.snappy.orc with stripeSize: 67108864 blockSize: 268435456 compression: SNAPPY bufferSize: 262144 22/05/05 10:37:01 INFO impl.WriterImpl: ORC writer created for path: /dl/mb/hh/raw/streaming/topic=gh***_l**_c/datetime=202205051000/part-00080-a38c9fc9-79c7-4c1d-abfc-ac53aef8b089.c000.snappy.orc with stripeSize: 67108864 blockSize: 268435456 compression: SNAPPY bufferSize: 262144 22/05/05 10:37:20 INFO executor.Executor: Finished task 1.0 in stage 10.0 (TID 1051). 2298 bytes result sent to driver 22/05/05 10:37:20 INFO executor.Executor: Finished task 32.0 in stage 10.0 (TID 1086). 2255 bytes result sent to driver 22/05/05 10:37:20 INFO executor.Executor: Executor is trying to kill task 80.0 in stage 10.0 (TID 1121), reason: Stage cancelled 22/05/05 10:37:20 ERROR util.Utils: Aborting task 22/05/05 10:37:20 ERROR util.Utils: Aborting task org.apache.spark.TaskKilledException at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149) at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:112) at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:82) at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:189) at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:249) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1368) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:253) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 22/05/05 10:37:20 WARN hdfs.DataStreamer: Caught exception java.lang.InterruptedException at java.base/java.lang.Object.wait(Native Method) at java.base/java.lang.Thread.join(Thread.java:1305) at java.base/java.lang.Thread.join(Thread.java:1379) at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:1001) at org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:850) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:846) 22/05/05 10:37:20 WARN util.Utils: Suppressing exception in catch: Interrupted while waiting for data to be acknowledged by pipeline java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:904) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:781) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:898) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:850) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.orc.impl.PhysicalFsWriter.close(PhysicalFsWriter.java:256) at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:611) at org.apache.orc.mapreduce.OrcMapreduceRecordWriter.close(OrcMapreduceRecordWriter.java:81) at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.close(OrcOutputWriter.scala:58) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.abort(FileFormatDataWriter.scala:83) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:255) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1377) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:253) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>