BTW, this is running on Spark 2.1.1.

I have been trying to debug this issue and what I have found till now is
that it is somehow related to the Spark WAL. The directory named
<checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting
written to after the point of an HDFS node being killed and restarted. I
have a couple questions around this :

1. Why is the flume receiver even writing to the WAL? I have not enabled
the 'spark.streaming.receiver.writeAheadLog.enable' property and even after
I set it explicitly to false in the driver, the WAL seems to be getting
written to.

2. Why would the receive receive metadata but not write to the WAL after an
HDFS node is lost and restarted? HDFS replication factor is at its default
of 2.

Thanks
N B


On Mon, Jun 19, 2017 at 6:23 PM, N B <nb.nos...@gmail.com> wrote:

> Hi all,
>
> We are running a Standalone Spark Cluster for running a streaming
> application. The application consumes data from Flume using a Flume Polling
> stream created as such :
>
> flumeStream = FlumeUtils.createPollingStream(streamingContext,
>     socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
>     StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>
>
> The checkpoint directory is configured to be on an HDFS cluster and Spark
> workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on
> their respective local filesystems.
>
> What we are seeing is some odd behavior and unable to explain. During
> normal operation, everything runs as expected with flume delivering events
> to Spark. However, while running, if I kill one of the HDFS nodes (does not
> matter which one), the Flume Receiver in Spark stops producing any data to
> the data processing.
>
> I enabled debug logging for org.apache.spark.streaming.flume on Spark
> worker nodes and looked at the logs for the one that gets to run the Flume
> Receiver and it keeps chugging along receiving data from Flume as shown in
> a sample of the log below, but the resulting batches in the Stream start
> receiving 0 records soon as the HDFS node is killed, with no errors being
> produced to indicate any issue.
>
> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
> 09fa05f59050*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
> with sequence number: 09fa05f59052*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
> number: 09fa05f59052*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
> 09fa05f59052*
> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
> with sequence number: 09fa05f59054*
> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
> number: 09fa05f59054*
>
> The driver output for the application shows (printed via
> Dstream.count().map().print()):
>
> -------------------------------------------
> Time: 1497920770000 ms
> -------------------------------------------
> Received 0     flume events.
>
>
> Any insights about where to look in order to find the root cause will be
> greatly appreciated.
>
> Thanks
> N B
>
>

Reply via email to