Hi, I am trying to run a spark application ingesting data from Kafka using the Spark structured streaming and the spark library org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird issue where during execution of all my micro-batches the Kafka consumer is not able to fetch the offsets and its having its offsets reset as show below in this log
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-5 to offset 1168959116. 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-1 to offset 1218619371. 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-8 to offset 1157205346. 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-21 to offset 1255403059. It is reasonable if this resetting happens once in application due to the fact that the offsets stored in my checkpoint are no longer valid and will have to reset our offsets to a new value. But I am seeing this reset happening for every micro batch execution in my streaming job. In at the end the streaming query progress emits the following 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: { "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1", "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e", "name" : null, "timestamp" : "2019-09-10T15:55:00.000Z", "batchId" : 189, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 127, "getBatch" : 0, "getEndOffset" : 0, "queryPlanning" : 24, "setOffsetRange" : 36, "triggerExecution" : 1859, "walCommit" : 1032 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[my_kafka_topic]]", "startOffset" : { "my_kafka_topic" : { "23" : 1206926686, "8" : 1158514946, "17" : 1258387219, "11" : 1263091642, "2" : 1226741128, "20" : 1229560889, "5" : 1170304913, "14" : 1207333901, "4" : 1274242728, "13" : 1336386658, "22" : 1260210993, "7" : 1288639296, "16" : 1247462229, "10" : 1093157103, "1" : 1219904858, "19" : 1116269615, "9" : 1238935018, "18" : 1069224544, "12" : 1256018541, "3" : 1251150202, "21" : 1256774117, "15" : 1170591375, "6" : 1185108169, "24" : 1202342095, "0" : 1165356330 } }, "endOffset" : { "my_kafka_topic" : { "23" : 1206928043, "8" : 1158516721, "17" : 1258389219, "11" : 1263093490, "2" : 1226743225, "20" : 1229562962, "5" : 1170307882, "14" : 1207335736, "4" : 1274245585, "13" : 1336388570, "22" : 1260213582, "7" : 1288641384, "16" : 1247464311, "10" : 1093159186, "1" : 1219906407, "19" : 1116271435, "9" : 1238936994, "18" : 1069226913, "12" : 1256020926, "3" : 1251152579, "21" : 1256776910, "15" : 1170593216, "6" : 1185110032, "24" : 1202344538, "0" : 1165358262 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]" } } In the above StreamingQueryProgress event the numInputRows fields is zero and this is the case for all micro batch executions and no data is being produced whatsoever. So basically for each batch my offsets are being reset and each batch is producing zero rows. Since there is no work being done and since dynamic allocation is enabled all my executors killed... I have tried deleting my checkpoint and started my application from scratch and I am still facing the same issue. What could possibly be wrong this?... what lines of investigation should I take? If you are interested in getting Stackoverflow point you can answer my question in SO here <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>. Thanks, Charles