Do you have rate limiting set on your stream? It may be that you are trying
to process records that have passed the retention period within Kafka.

On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com>
wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using the
> Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to the
> fact that the offsets stored in my checkpoint are no longer valid and will
> have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
>     "addBatch" : 127,
>     "getBatch" : 0,
>     "getEndOffset" : 0,
>     "queryPlanning" : 24,
>     "setOffsetRange" : 36,
>     "triggerExecution" : 1859,
>     "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>     "startOffset" : {
>       "my_kafka_topic" : {
>         "23" : 1206926686,
>         "8" : 1158514946,
>         "17" : 1258387219,
>         "11" : 1263091642,
>         "2" : 1226741128,
>         "20" : 1229560889,
>         "5" : 1170304913,
>         "14" : 1207333901,
>         "4" : 1274242728,
>         "13" : 1336386658,
>         "22" : 1260210993,
>         "7" : 1288639296,
>         "16" : 1247462229,
>         "10" : 1093157103,
>         "1" : 1219904858,
>         "19" : 1116269615,
>         "9" : 1238935018,
>         "18" : 1069224544,
>         "12" : 1256018541,
>         "3" : 1251150202,
>         "21" : 1256774117,
>         "15" : 1170591375,
>         "6" : 1185108169,
>         "24" : 1202342095,
>         "0" : 1165356330
>       }
>     },
>     "endOffset" : {
>       "my_kafka_topic" : {
>         "23" : 1206928043,
>         "8" : 1158516721,
>         "17" : 1258389219,
>         "11" : 1263093490,
>         "2" : 1226743225,
>         "20" : 1229562962,
>         "5" : 1170307882,
>         "14" : 1207335736,
>         "4" : 1274245585,
>         "13" : 1336388570,
>         "22" : 1260213582,
>         "7" : 1288641384,
>         "16" : 1247464311,
>         "10" : 1093159186,
>         "1" : 1219906407,
>         "19" : 1116271435,
>         "9" : 1238936994,
>         "18" : 1069226913,
>         "12" : 1256020926,
>         "3" : 1251152579,
>         "21" : 1256776910,
>         "15" : 1170593216,
>         "6" : 1185110032,
>         "24" : 1202344538,
>         "0" : 1165358262
>       }
>     },
>     "numInputRows" : 0,
>     "inputRowsPerSecond" : 0.0,
>     "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
>     "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>   }
> }
>
>
> In the above StreamingQueryProgress event the numInputRows fields  is zero
> and this is the case for all micro batch executions and no data is being
> produced whatsoever. So basically for each batch my offsets are being reset
> and each batch is producing zero rows. Since there is no work being done
> and since dynamic allocation is enabled all my executors killed... I have
> tried deleting my checkpoint and started my application from scratch and I
> am still facing the same issue. What could possibly be wrong this?... what
> lines of investigation should I take?  If you are interested in getting
> Stackoverflow point you can answer my question in SO here
> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.
>
>
> Thanks,
> Charles
>
>

Reply via email to