Hi All, The default value for spark.streaming.blockQueueSize is 10 in https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala. In spark kinesis asl 1.4 the received Kinesis records are stored by calling addData on line 115 - https://github.com/apache/spark/blob/branch-1.4/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala#L115 which pushes one data item to the buffer. This is a problem because, at application startup, a single Kinesis Worker gains lease for all (or a majority of) shards for the Kinesis stream. This is by design, KCL load balances as new Workers are started. But, the single Worker which initially gains lease for a lot of shards, ends up being blocked on the addData method, as there will be many KinesisRecordProcessor threads trying to add the received data to the buffer. The buffer uses a ArrayBlockingQueue with the size specified in spark.streaming.blockQueueSize which is set to 10 by default. The ArrayBlockingQueue is flushed out to memorystore every 100ms. So the KinesisRecordProcessor threads will be blocked for long period (like upto an hour) on application startup. The impact is that there will be some Kinesis shards that don't get consumed by the spark streaming application, until its KinesisRecordProcessor thread gets unblocked.
To fix/work around the issue would it be ok to increase the spark.streaming.blockQueueSize to a larger value. I suppose the main consideration when increasing this size would be the memory allocated to the executor. I haven't seen much documentation on this config. And any advise on how to fine tune this would be useful. Thanks, Spark newbie