Hi Charles, Can you check is any of the case related to output directory and checkpoint location mentioned in below link is applicable in your case?
https://kb.databricks.com/streaming/file-sink-streaming.html Regards Dhaval On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz <brk...@gmail.com> wrote: > Hey Charles, > If you are using maxOffsetsPerTrigger, you will likely rest the offsets > every microbatch, because: > 1. Spark will figure out a range of offsets to process (let's call them x > and y) > 2. If these offsets have fallen out of the retention period, Spark will > try to set the offset to x which is less than z > y > x. > 3. Since z > y, Spark will not process any of the data > 4. Goto 1 > > On Wed, Sep 11, 2019, 6:09 PM Charles vinodh <mig.flan...@gmail.com> > wrote: > >> Hi Sandish, >> >> as I have said if the offset reset happens only once that would make >> sense. But I am not sure how to explain why the offset reset is happening >> for every micro-batch... >> ideally once the offset reset happens the app should move to a valid >> offset and start consuming data. but in my case for every batch the offset >> is getting reset and no data is ever getting generated. >> >> Thanks, >> Charles >> >> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN <sanysand...@gmail.com> >> wrote: >> >>> You can see this kind of error, if there is consumer lag more than Kafka >>> retention period. >>> You will not see any failures if below option is not set. >>> >>> Set failOnDataLoss=true option to see failures. >>> >>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh <mig.flan...@gmail.com> >>> wrote: >>> >>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger * >>>> and *fetch.message.max.bytes. * >>>> >>>> *"*may be that you are trying to process records that have passed the >>>> retention period within Kafka.*"* >>>> If the above is true then I should have my offsets reset only once >>>> ideally when my application starts. But mu offsets are resetting for every >>>> batch. if my application is using offsets that are no longer available in >>>> Kafka it will reset to earliest or latest offset available in Kafka and the >>>> next request made to Kafka should provide proper data. But in case for all >>>> micro-batches the offsets are getting reseted and the batch is producing no >>>> data. >>>> >>>> >>>> >>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Do you have rate limiting set on your stream? It may be that you are >>>>> trying to process records that have passed the retention period within >>>>> Kafka. >>>>> >>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to run a spark application ingesting data from Kafka >>>>>> using the Spark structured streaming and the spark library >>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very >>>>>> weird >>>>>> issue where during execution of all my micro-batches the Kafka consumer >>>>>> is >>>>>> not able to fetch the offsets and its having its offsets reset as show >>>>>> below in this log >>>>>> >>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>> Resetting offset for partition my-topic-5 to offset 1168959116. >>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>> Resetting offset for partition my-topic-1 to offset 1218619371. >>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>> Resetting offset for partition my-topic-8 to offset 1157205346. >>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, >>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] >>>>>> Resetting offset for partition my-topic-21 to offset 1255403059. >>>>>> >>>>>> >>>>>> It is reasonable if this resetting happens once in application due to >>>>>> the fact that the offsets stored in my checkpoint are no longer valid and >>>>>> will have to reset our offsets to a new value. But I am seeing this reset >>>>>> happening for every micro batch execution in my streaming job. In at the >>>>>> end the streaming query progress emits the following >>>>>> >>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made >>>>>> progress: { >>>>>> "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1", >>>>>> "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e", >>>>>> "name" : null, >>>>>> "timestamp" : "2019-09-10T15:55:00.000Z", >>>>>> "batchId" : 189, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "addBatch" : 127, >>>>>> "getBatch" : 0, >>>>>> "getEndOffset" : 0, >>>>>> "queryPlanning" : 24, >>>>>> "setOffsetRange" : 36, >>>>>> "triggerExecution" : 1859, >>>>>> "walCommit" : 1032 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]", >>>>>> "startOffset" : { >>>>>> "my_kafka_topic" : { >>>>>> "23" : 1206926686, >>>>>> "8" : 1158514946, >>>>>> "17" : 1258387219, >>>>>> "11" : 1263091642, >>>>>> "2" : 1226741128, >>>>>> "20" : 1229560889, >>>>>> "5" : 1170304913, >>>>>> "14" : 1207333901, >>>>>> "4" : 1274242728, >>>>>> "13" : 1336386658, >>>>>> "22" : 1260210993, >>>>>> "7" : 1288639296, >>>>>> "16" : 1247462229, >>>>>> "10" : 1093157103, >>>>>> "1" : 1219904858, >>>>>> "19" : 1116269615, >>>>>> "9" : 1238935018, >>>>>> "18" : 1069224544, >>>>>> "12" : 1256018541, >>>>>> "3" : 1251150202, >>>>>> "21" : 1256774117, >>>>>> "15" : 1170591375, >>>>>> "6" : 1185108169, >>>>>> "24" : 1202342095, >>>>>> "0" : 1165356330 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "my_kafka_topic" : { >>>>>> "23" : 1206928043, >>>>>> "8" : 1158516721, >>>>>> "17" : 1258389219, >>>>>> "11" : 1263093490, >>>>>> "2" : 1226743225, >>>>>> "20" : 1229562962, >>>>>> "5" : 1170307882, >>>>>> "14" : 1207335736, >>>>>> "4" : 1274245585, >>>>>> "13" : 1336388570, >>>>>> "22" : 1260213582, >>>>>> "7" : 1288641384, >>>>>> "16" : 1247464311, >>>>>> "10" : 1093159186, >>>>>> "1" : 1219906407, >>>>>> "19" : 1116271435, >>>>>> "9" : 1238936994, >>>>>> "18" : 1069226913, >>>>>> "12" : 1256020926, >>>>>> "3" : 1251152579, >>>>>> "21" : 1256776910, >>>>>> "15" : 1170593216, >>>>>> "6" : 1185110032, >>>>>> "24" : 1202344538, >>>>>> "0" : 1165358262 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : >>>>>> "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]" >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> In the above StreamingQueryProgress event the numInputRows fields is >>>>>> zero and this is the case for all micro batch executions and no data is >>>>>> being produced whatsoever. So basically for each batch my offsets are >>>>>> being >>>>>> reset and each batch is producing zero rows. Since there is no work being >>>>>> done and since dynamic allocation is enabled all my executors killed... I >>>>>> have tried deleting my checkpoint and started my application from scratch >>>>>> and I am still facing the same issue. What could possibly be wrong >>>>>> this?... >>>>>> what lines of investigation should I take? If you are interested in >>>>>> getting Stackoverflow point you can answer my question in SO here >>>>>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Charles >>>>>> >>>>>> >>>>> -- >>> Sent from Gmail Mobile >>> >>