Re: Structured Streaming - HDFS State Store Performance Issues

2020-01-14 Thread Gourav Sengupta
Hi Will,

have you tried using S3 as state store with the option in EMR enabled for
faster file sync, also there is an option now of using FSx Lustre.

Thanks and Regards,
Gourav Sengupta

On Wed, Jan 15, 2020 at 5:17 AM William Briggs  wrote:

> Hi all, I've got a problem that really has me stumped. I'm running a
> Structured Streaming query that reads from Kafka, performs some
> transformations and stateful aggregations (using flatMapGroupsWithState),
> and outputs any updated aggregates to another Kafka topic.
>
> I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
> Semi-regularly, all the tasks except one will complete, and the one
> remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
> I've checked the number of input records (and overall size) for that task,
> and everything seems in-line with all the other tasks - there's no visible
> skew.
>
> The only thing I have to go on at the moment is that the thread dump on
> the executor that is hung shows a 'state-store-maintenance-task' thread,
> which is blocked on an "Executor task launch worker" thread - that second
> thread shows as TIMED_WAITING, with the following locks:
>
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
>> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
>> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
>> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
>> })
>>
>
> And a stack of:
>
> java.lang.Object.wait(Native Method)
>>
>> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
>> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
>> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
>> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
>> => holding
>> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
>> })
>> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
>> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>>
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>>
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
>> => holding
>> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
>> })
>>
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>>
>> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>>
>
> Based on this, I'm guessing that there's some kind of delay happening with
> the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
> large GCs, plenty of free memory, network bandwidth isn't saturated, no
> under-replicated blocks).
>
> Has anyone run into a problem like this before? Any help would be greatly
> appreciated!
>
> Regards,
> Will
>


Structured Streaming - HDFS State Store Performance Issues

2020-01-14 Thread William Briggs
Hi all, I've got a problem that really has me stumped. I'm running a
Structured Streaming query that reads from Kafka, performs some
transformations and stateful aggregations (using flatMapGroupsWithState),
and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
Semi-regularly, all the tasks except one will complete, and the one
remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
I've checked the number of input records (and overall size) for that task,
and everything seems in-line with all the other tasks - there's no visible
skew.

The only thing I have to go on at the moment is that the thread dump on the
executor that is hung shows a 'state-store-maintenance-task' thread, which
is blocked on an "Executor task launch worker" thread - that second thread
shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>

And a stack of:

java.lang.Object.wait(Native Method)
>
> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
> })
> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>

Based on this, I'm guessing that there's some kind of delay happening with
the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
large GCs, plenty of free memory, network bandwidth isn't saturated, no
under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly
appreciated!

Regards,
Will