Thanks Dhaval, that fixed the issue. The constant resetting of Kafka offsets misled me about the issue. Please feel free the answer the SO question here <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed> if you would like to..
On Wed, Sep 11, 2019 at 9:03 PM Dhaval Patel <mailto.dhava...@gmail.com> wrote: > Hi Charles, > > Can you check is any of the case related to output directory and > checkpoint location mentioned in below link is applicable in your case? > > https://kb.databricks.com/streaming/file-sink-streaming.html > > Regards > Dhaval > > On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz <brk...@gmail.com> wrote: > >> Hey Charles, >> If you are using maxOffsetsPerTrigger, you will likely rest the offsets >> every microbatch, because: >> 1. Spark will figure out a range of offsets to process (let's call them >> x and y) >> 2. If these offsets have fallen out of the retention period, Spark will >> try to set the offset to x which is less than z > y > x. >> 3. Since z > y, Spark will not process any of the data >> 4. Goto 1 >> >> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh <mig.flan...@gmail.com> >> wrote: >> >>> Hi Sandish, >>> >>> as I have said if the offset reset happens only once that would make >>> sense. But I am not sure how to explain why the offset reset is happening >>> for every micro-batch... >>> ideally once the offset reset happens the app should move to a valid >>> offset and start consuming data. but in my case for every batch the offset >>> is getting reset and no data is ever getting generated. >>> >>> Thanks, >>> Charles >>> >>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN <sanysand...@gmail.com> >>> wrote: >>> >>>> You can see this kind of error, if there is consumer lag more than >>>> Kafka retention period. >>>> You will not see any failures if below option is not set. >>>> >>>> Set failOnDataLoss=true option to see failures. >>>> >>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh <mig.flan...@gmail.com> >>>> wrote: >>>> >>>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger * >>>>> and *fetch.message.max.bytes. * >>>>> >>>>> *"*may be that you are trying to process records that have passed the >>>>> retention period within Kafka.*"* >>>>> If the above is true then I should have my offsets reset only once >>>>> ideally when my application starts. But mu offsets are resetting for every >>>>> batch. if my application is using offsets that are no longer available in >>>>> Kafka it will reset to earliest or latest offset available in Kafka and >>>>> the >>>>> next request made to Kafka should provide proper data. But in case for all >>>>> micro-batches the offsets are getting reseted and the batch is producing >>>>> no >>>>> data. >>>>> >>>>> >>>>> >>>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brk...@gmail.com> wrote: >>>>> >>>>>> Do you have rate limiting set on your stream? It may be that you are >>>>>> trying to process records that have passed the retention period within >>>>>> Kafka. >>>>>> >>>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am trying to run a spark application ingesting data from Kafka >>>>>>> using the Spark structured streaming and the spark library >>>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very >>>>>>> weird >>>>>>> issue where during execution of all my micro-batches the Kafka consumer >>>>>>> is >>>>>>> not able to fetch the offsets and its having its offsets reset as show >>>>>>> below in this log >>>>>>> >>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>>> Resetting offset for partition my-topic-5 to offset 1168959116. >>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>>> Resetting offset for partition my-topic-1 to offset 1218619371. >>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>>> Resetting offset for partition my-topic-8 to offset 1157205346. >>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>>> Resetting offset for partition my-topic-21 to offset 1255403059. >>>>>>> >>>>>>> >>>>>>> It is reasonable if this resetting happens once in application due >>>>>>> to the fact that the offsets stored in my checkpoint are no longer valid >>>>>>> and will have to reset our offsets to a new value. But I am seeing this >>>>>>> reset happening for every micro batch execution in my streaming job. In >>>>>>> at >>>>>>> the end the streaming query progress emits the following >>>>>>> >>>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made >>>>>>> progress: { >>>>>>> "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1", >>>>>>> "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e", >>>>>>> "name" : null, >>>>>>> "timestamp" : "2019-09-10T15:55:00.000Z", >>>>>>> "batchId" : 189, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "addBatch" : 127, >>>>>>> "getBatch" : 0, >>>>>>> "getEndOffset" : 0, >>>>>>> "queryPlanning" : 24, >>>>>>> "setOffsetRange" : 36, >>>>>>> "triggerExecution" : 1859, >>>>>>> "walCommit" : 1032 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]", >>>>>>> "startOffset" : { >>>>>>> "my_kafka_topic" : { >>>>>>> "23" : 1206926686, >>>>>>> "8" : 1158514946, >>>>>>> "17" : 1258387219, >>>>>>> "11" : 1263091642, >>>>>>> "2" : 1226741128, >>>>>>> "20" : 1229560889, >>>>>>> "5" : 1170304913, >>>>>>> "14" : 1207333901, >>>>>>> "4" : 1274242728, >>>>>>> "13" : 1336386658, >>>>>>> "22" : 1260210993, >>>>>>> "7" : 1288639296, >>>>>>> "16" : 1247462229, >>>>>>> "10" : 1093157103, >>>>>>> "1" : 1219904858, >>>>>>> "19" : 1116269615, >>>>>>> "9" : 1238935018, >>>>>>> "18" : 1069224544, >>>>>>> "12" : 1256018541, >>>>>>> "3" : 1251150202, >>>>>>> "21" : 1256774117, >>>>>>> "15" : 1170591375, >>>>>>> "6" : 1185108169, >>>>>>> "24" : 1202342095, >>>>>>> "0" : 1165356330 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "my_kafka_topic" : { >>>>>>> "23" : 1206928043, >>>>>>> "8" : 1158516721, >>>>>>> "17" : 1258389219, >>>>>>> "11" : 1263093490, >>>>>>> "2" : 1226743225, >>>>>>> "20" : 1229562962, >>>>>>> "5" : 1170307882, >>>>>>> "14" : 1207335736, >>>>>>> "4" : 1274245585, >>>>>>> "13" : 1336388570, >>>>>>> "22" : 1260213582, >>>>>>> "7" : 1288641384, >>>>>>> "16" : 1247464311, >>>>>>> "10" : 1093159186, >>>>>>> "1" : 1219906407, >>>>>>> "19" : 1116271435, >>>>>>> "9" : 1238936994, >>>>>>> "18" : 1069226913, >>>>>>> "12" : 1256020926, >>>>>>> "3" : 1251152579, >>>>>>> "21" : 1256776910, >>>>>>> "15" : 1170593216, >>>>>>> "6" : 1185110032, >>>>>>> "24" : 1202344538, >>>>>>> "0" : 1165358262 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : >>>>>>> "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]" >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> In the above StreamingQueryProgress event the numInputRows fields >>>>>>> is zero and this is the case for all micro batch executions and no >>>>>>> data is >>>>>>> being produced whatsoever. So basically for each batch my offsets are >>>>>>> being >>>>>>> reset and each batch is producing zero rows. Since there is no work >>>>>>> being >>>>>>> done and since dynamic allocation is enabled all my executors killed... >>>>>>> I >>>>>>> have tried deleting my checkpoint and started my application from >>>>>>> scratch >>>>>>> and I am still facing the same issue. What could possibly be wrong >>>>>>> this?... >>>>>>> what lines of investigation should I take? If you are interested in >>>>>>> getting Stackoverflow point you can answer my question in SO here >>>>>>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Charles >>>>>>> >>>>>>> >>>>>> -- >>>> Sent from Gmail Mobile >>>> >>>