I'm running into a weird issue with a stateful streaming job I'm running.
(Spark 2.1.0 reading from kafka 0-10 input stream.)

>From what I understand from the docs, by default the checkpoint interval
for stateful streaming is 10 * batchInterval.  Since I'm running a batch
interval of 10 seconds, I would expect that checkpointing should only get
done every 100 seconds.  But what I'm seeing is that Spark is not only
checkpointing every 10 seconds, it's checkpointing twice every 10 seconds!

My code approximately looks like follows:

    val eventStream = kafkaStream.
        transform(
...
        ).
        map(
...
        ).
        transform(
...
        )

    val stateStream = eventStream.mapWithState(
...
    )

    stateUpdatesStream.foreachRDD(
...
    )


When the app initializes, the checkpointing configuration looks like so:

17/06/01 21:19:05 INFO DirectKafkaInputDStream: Duration for remembering
RDDs set to 200000 ms for
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4a85a52c
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Slide time = 10000 ms
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Storage level = Serialized
1x Replicated
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Remember interval = 200000
ms
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Initialized and validated
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4a85a52c
17/06/01 21:19:05 INFO TransformedDStream: Slide time = 10000 ms
17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 200000 ms
17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
org.apache.spark.streaming.dstream.TransformedDStream@201d4bfb
17/06/01 21:19:05 INFO MappedDStream: Slide time = 10000 ms
17/06/01 21:19:05 INFO MappedDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO MappedDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO MappedDStream: Remember interval = 200000 ms
17/06/01 21:19:05 INFO MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@1208bde7
17/06/01 21:19:05 INFO TransformedDStream: Slide time = 10000 ms
17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 200000 ms
17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
org.apache.spark.streaming.dstream.TransformedDStream@370b0505
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Slide time = 10000 ms
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Storage level = Memory
Deserialized 1x Replicated
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
100000 ms
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Remember interval =
200000 ms
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Initialized and
validated
org.apache.spark.streaming.dstream.InternalMapWithStateDStream@746c7658
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Slide time = 10000 ms
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Storage level = Serialized
1x Replicated
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Checkpoint interval = null
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Remember interval = 10000 ms
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Initialized and validated
org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@75d7326b
17/06/01 21:19:05 INFO ForEachDStream: Slide time = 10000 ms
17/06/01 21:19:05 INFO ForEachDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO ForEachDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO ForEachDStream: Remember interval = 10000 ms
17/06/01 21:19:05 INFO ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@2b3b2628


Note that there's one line that's correctly showing the 100 second
checkpointing interval:

17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
100000 ms


And yet the app is still performing checkpointing every 10 seconds ...
twice every 10 seconds, in fact!

17/06/01 21:19:10 INFO CheckpointWriter: Submitted checkpoint of time
1496351950000 ms to writer queue
17/06/01 21:19:10 INFO CheckpointWriter: Saving checkpoint for time
1496351950000 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351950000'
17/06/01 21:19:10 INFO CheckpointWriter: Checkpoint for time 1496351950000
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351950000',
took 8324 bytes and 165 ms
17/06/01 21:19:11 INFO CheckpointWriter: Submitted checkpoint of time
1496351950000 ms to writer queue
17/06/01 21:19:11 INFO CheckpointWriter: Saving checkpoint for time
1496351950000 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351950000'
17/06/01 21:19:11 INFO CheckpointWriter: Checkpoint for time 1496351950000
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351950000',
took 8321 bytes and 22 ms
17/06/01 21:19:20 INFO CheckpointWriter: Submitted checkpoint of time
1496351960000 ms to writer queue
17/06/01 21:19:20 INFO CheckpointWriter: Saving checkpoint for time
1496351960000 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351960000'
17/06/01 21:19:20 INFO CheckpointWriter: Checkpoint for time 1496351960000
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351960000',
took 8390 bytes and 20 ms
17/06/01 21:19:20 INFO CheckpointWriter: Submitted checkpoint of time
1496351960000 ms to writer queue
17/06/01 21:19:20 INFO CheckpointWriter: Saving checkpoint for time
1496351960000 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351960000'
17/06/01 21:19:20 INFO CheckpointWriter: Checkpoint for time 1496351960000
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351960000',
took 8386 bytes and 24 ms
17/06/01 21:19:30 INFO CheckpointWriter: Submitted checkpoint of time
1496351970000 ms to writer queue
17/06/01 21:19:30 INFO CheckpointWriter: Saving checkpoint for time
1496351970000 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351970000'
17/06/01 21:19:30 INFO CheckpointWriter: Checkpoint for time 1496351970000
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351970000',
took 8398 bytes and 25 ms
17/06/01 21:19:30 INFO CheckpointWriter: Submitted checkpoint of time
1496351970000 ms to writer queue
17/06/01 21:19:30 INFO CheckpointWriter: Saving checkpoint for time
1496351970000 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351970000'
17/06/01 21:19:30 INFO CheckpointWriter: Checkpoint for time 1496351970000
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351970000',
took 8394 bytes and 22 ms


Anyone have an idea what's going wrong here, and/or how to fix this issue?

Thanks,

DR

Reply via email to