Hey Charles, If you are using maxOffsetsPerTrigger, you will likely rest the offsets every microbatch, because: 1. Spark will figure out a range of offsets to process (let's call them x and y) 2. If these offsets have fallen out of the retention period, Spark will try to set the offset to x which is less than z > y > x. 3. Since z > y, Spark will not process any of the data 4. Goto 1
On Wed, Sep 11, 2019, 6:09 PM Charles vinodh <mig.flan...@gmail.com> wrote: > Hi Sandish, > > as I have said if the offset reset happens only once that would make > sense. But I am not sure how to explain why the offset reset is happening > for every micro-batch... > ideally once the offset reset happens the app should move to a valid > offset and start consuming data. but in my case for every batch the offset > is getting reset and no data is ever getting generated. > > Thanks, > Charles > > On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN <sanysand...@gmail.com> > wrote: > >> You can see this kind of error, if there is consumer lag more than Kafka >> retention period. >> You will not see any failures if below option is not set. >> >> Set failOnDataLoss=true option to see failures. >> >> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh <mig.flan...@gmail.com> >> wrote: >> >>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and >>> *fetch.message.max.bytes. * >>> >>> *"*may be that you are trying to process records that have passed the >>> retention period within Kafka.*"* >>> If the above is true then I should have my offsets reset only once >>> ideally when my application starts. But mu offsets are resetting for every >>> batch. if my application is using offsets that are no longer available in >>> Kafka it will reset to earliest or latest offset available in Kafka and the >>> next request made to Kafka should provide proper data. But in case for all >>> micro-batches the offsets are getting reseted and the batch is producing no >>> data. >>> >>> >>> >>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brk...@gmail.com> wrote: >>> >>>> Do you have rate limiting set on your stream? It may be that you are >>>> trying to process records that have passed the retention period within >>>> Kafka. >>>> >>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Hi, >>>>> >>>>> I am trying to run a spark application ingesting data from Kafka using >>>>> the Spark structured streaming and the spark library >>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird >>>>> issue where during execution of all my micro-batches the Kafka consumer is >>>>> not able to fetch the offsets and its having its offsets reset as show >>>>> below in this log >>>>> >>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>> Resetting offset for partition my-topic-5 to offset 1168959116. >>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>> Resetting offset for partition my-topic-1 to offset 1218619371. >>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>> Resetting offset for partition my-topic-8 to offset 1157205346. >>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>> Resetting offset for partition my-topic-21 to offset 1255403059. >>>>> >>>>> >>>>> It is reasonable if this resetting happens once in application due to >>>>> the fact that the offsets stored in my checkpoint are no longer valid and >>>>> will have to reset our offsets to a new value. But I am seeing this reset >>>>> happening for every micro batch execution in my streaming job. In at the >>>>> end the streaming query progress emits the following >>>>> >>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made >>>>> progress: { >>>>> "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1", >>>>> "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e", >>>>> "name" : null, >>>>> "timestamp" : "2019-09-10T15:55:00.000Z", >>>>> "batchId" : 189, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "addBatch" : 127, >>>>> "getBatch" : 0, >>>>> "getEndOffset" : 0, >>>>> "queryPlanning" : 24, >>>>> "setOffsetRange" : 36, >>>>> "triggerExecution" : 1859, >>>>> "walCommit" : 1032 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]", >>>>> "startOffset" : { >>>>> "my_kafka_topic" : { >>>>> "23" : 1206926686, >>>>> "8" : 1158514946, >>>>> "17" : 1258387219, >>>>> "11" : 1263091642, >>>>> "2" : 1226741128, >>>>> "20" : 1229560889, >>>>> "5" : 1170304913, >>>>> "14" : 1207333901, >>>>> "4" : 1274242728, >>>>> "13" : 1336386658, >>>>> "22" : 1260210993, >>>>> "7" : 1288639296, >>>>> "16" : 1247462229, >>>>> "10" : 1093157103, >>>>> "1" : 1219904858, >>>>> "19" : 1116269615, >>>>> "9" : 1238935018, >>>>> "18" : 1069224544, >>>>> "12" : 1256018541, >>>>> "3" : 1251150202, >>>>> "21" : 1256774117, >>>>> "15" : 1170591375, >>>>> "6" : 1185108169, >>>>> "24" : 1202342095, >>>>> "0" : 1165356330 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "my_kafka_topic" : { >>>>> "23" : 1206928043, >>>>> "8" : 1158516721, >>>>> "17" : 1258389219, >>>>> "11" : 1263093490, >>>>> "2" : 1226743225, >>>>> "20" : 1229562962, >>>>> "5" : 1170307882, >>>>> "14" : 1207335736, >>>>> "4" : 1274245585, >>>>> "13" : 1336388570, >>>>> "22" : 1260213582, >>>>> "7" : 1288641384, >>>>> "16" : 1247464311, >>>>> "10" : 1093159186, >>>>> "1" : 1219906407, >>>>> "19" : 1116271435, >>>>> "9" : 1238936994, >>>>> "18" : 1069226913, >>>>> "12" : 1256020926, >>>>> "3" : 1251152579, >>>>> "21" : 1256776910, >>>>> "15" : 1170593216, >>>>> "6" : 1185110032, >>>>> "24" : 1202344538, >>>>> "0" : 1165358262 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : >>>>> "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]" >>>>> } >>>>> } >>>>> >>>>> >>>>> In the above StreamingQueryProgress event the numInputRows fields is >>>>> zero and this is the case for all micro batch executions and no data is >>>>> being produced whatsoever. So basically for each batch my offsets are >>>>> being >>>>> reset and each batch is producing zero rows. Since there is no work being >>>>> done and since dynamic allocation is enabled all my executors killed... I >>>>> have tried deleting my checkpoint and started my application from scratch >>>>> and I am still facing the same issue. What could possibly be wrong >>>>> this?... >>>>> what lines of investigation should I take? If you are interested in >>>>> getting Stackoverflow point you can answer my question in SO here >>>>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>. >>>>> >>>>> >>>>> Thanks, >>>>> Charles >>>>> >>>>> >>>> -- >> Sent from Gmail Mobile >> >