Hi All,

The default value for spark.streaming.blockQueueSize is 10 in
https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala.
In spark kinesis asl 1.4 the received Kinesis records are stored by calling
addData on line 115 -
https://github.com/apache/spark/blob/branch-1.4/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala#L115
which pushes one data item to the buffer. This is a problem because, at
application startup, a single Kinesis Worker gains lease for all (or a
majority of) shards for the Kinesis stream. This is by design, KCL load
balances as new Workers are started. But, the single Worker which initially
gains lease for a lot of shards, ends up being blocked on the addData
method, as there will be many KinesisRecordProcessor threads trying to add
the received data to the buffer. The buffer uses a ArrayBlockingQueue with
the size specified in spark.streaming.blockQueueSize which is set to 10 by
default. The
ArrayBlockingQueue is flushed out to memorystore every 100ms. So the
KinesisRecordProcessor threads will be blocked for long period (like upto
an hour) on application startup. The impact is that there will be some
Kinesis shards that don't get consumed by the spark streaming application,
until its KinesisRecordProcessor thread gets unblocked.

To fix/work around the issue would it be ok to increase the
spark.streaming.blockQueueSize to a larger value. I suppose the main
consideration when increasing this size would be the memory allocated to
the executor. I haven't seen much documentation on this config. And any
advise on how to fine tune this would be useful.

Thanks,
Spark newbie

Reply via email to