Hi all, I've got a problem that really has me stumped. I'm running a
Structured Streaming query that reads from Kafka, performs some
transformations and stateful aggregations (using flatMapGroupsWithState),
and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
Semi-regularly, all the tasks except one will complete, and the one
remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
I've checked the number of input records (and overall size) for that task,
and everything seems in-line with all the other tasks - there's no visible
skew.

The only thing I have to go on at the moment is that the thread dump on the
executor that is hung shows a 'state-store-maintenance-task' thread, which
is blocked on an "Executor task launch worker" thread - that second thread
shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>

And a stack of:

java.lang.Object.wait(Native Method)
>
> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
> })
> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>

Based on this, I'm guessing that there's some kind of delay happening with
the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
large GCs, plenty of free memory, network bandwidth isn't saturated, no
under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly
appreciated!

Regards,
Will

Reply via email to