Thanks Dhaval, that fixed the issue. The constant resetting of Kafka
offsets misled me about the issue. Please feel free the answer the SO
question here
<https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>
if
you would like to..





On Wed, Sep 11, 2019 at 9:03 PM Dhaval Patel <mailto.dhava...@gmail.com>
wrote:

> Hi Charles,
>
> Can you check is any of the case related to output directory and
> checkpoint location mentioned in below link is applicable in your case?
>
> https://kb.databricks.com/streaming/file-sink-streaming.html
>
> Regards
> Dhaval
>
> On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hey Charles,
>> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
>> every microbatch, because:
>>  1. Spark will figure out a range of offsets to process (let's call them
>> x and y)
>>  2. If these offsets have fallen out of the retention period, Spark will
>> try to set the offset to x which is less than z > y > x.
>>  3. Since z > y, Spark will not process any of the data
>>  4. Goto 1
>>
>> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh <mig.flan...@gmail.com>
>> wrote:
>>
>>> Hi Sandish,
>>>
>>> as I have said if the offset reset happens only once that would make
>>> sense. But I am not sure how to explain why the offset reset is happening
>>> for every micro-batch...
>>> ideally once the offset reset happens the app should move to a valid
>>> offset and start consuming data. but in my case for every batch the offset
>>> is getting reset and no data is ever getting generated.
>>>
>>> Thanks,
>>> Charles
>>>
>>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN <sanysand...@gmail.com>
>>> wrote:
>>>
>>>> You can see this kind of error, if there is consumer lag more than
>>>> Kafka retention period.
>>>> You will not see any failures if below option is not set.
>>>>
>>>> Set failOnDataLoss=true option to see failures.
>>>>
>>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh <mig.flan...@gmail.com>
>>>> wrote:
>>>>
>>>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *
>>>>> and *fetch.message.max.bytes. *
>>>>>
>>>>> *"*may be that you are trying to process records that have passed the
>>>>> retention period within Kafka.*"*
>>>>> If the above is true then I should have my offsets reset only once
>>>>> ideally when my application starts. But mu offsets are resetting for every
>>>>> batch. if my application is using offsets that are no longer available in
>>>>> Kafka it will reset to earliest or latest offset available in Kafka and 
>>>>> the
>>>>> next request made to Kafka should provide proper data. But in case for all
>>>>> micro-batches the offsets are getting reseted and the batch is producing 
>>>>> no
>>>>> data.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brk...@gmail.com> wrote:
>>>>>
>>>>>> Do you have rate limiting set on your stream? It may be that you are
>>>>>> trying to process records that have passed the retention period within
>>>>>> Kafka.
>>>>>>
>>>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to run a spark application ingesting data from Kafka
>>>>>>> using the Spark structured streaming and the spark library
>>>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>>>>>>> weird
>>>>>>> issue where during execution of all my micro-batches the Kafka consumer 
>>>>>>> is
>>>>>>> not able to fetch the offsets and its having its offsets reset as show
>>>>>>> below in this log
>>>>>>>
>>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>>>>>
>>>>>>>
>>>>>>> It is reasonable if this resetting happens once in application due
>>>>>>> to the fact that the offsets stored in my checkpoint are no longer valid
>>>>>>> and will have to reset our offsets to a new value. But I am seeing this
>>>>>>> reset happening for every micro batch execution in my streaming job. In 
>>>>>>> at
>>>>>>> the end the streaming query progress emits the following
>>>>>>>
>>>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>>>>>>> progress: {
>>>>>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>>>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>>>>>   "name" : null,
>>>>>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>>>>>   "batchId" : 189,
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "addBatch" : 127,
>>>>>>>     "getBatch" : 0,
>>>>>>>     "getEndOffset" : 0,
>>>>>>>     "queryPlanning" : 24,
>>>>>>>     "setOffsetRange" : 36,
>>>>>>>     "triggerExecution" : 1859,
>>>>>>>     "walCommit" : 1032
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "my_kafka_topic" : {
>>>>>>>         "23" : 1206926686,
>>>>>>>         "8" : 1158514946,
>>>>>>>         "17" : 1258387219,
>>>>>>>         "11" : 1263091642,
>>>>>>>         "2" : 1226741128,
>>>>>>>         "20" : 1229560889,
>>>>>>>         "5" : 1170304913,
>>>>>>>         "14" : 1207333901,
>>>>>>>         "4" : 1274242728,
>>>>>>>         "13" : 1336386658,
>>>>>>>         "22" : 1260210993,
>>>>>>>         "7" : 1288639296,
>>>>>>>         "16" : 1247462229,
>>>>>>>         "10" : 1093157103,
>>>>>>>         "1" : 1219904858,
>>>>>>>         "19" : 1116269615,
>>>>>>>         "9" : 1238935018,
>>>>>>>         "18" : 1069224544,
>>>>>>>         "12" : 1256018541,
>>>>>>>         "3" : 1251150202,
>>>>>>>         "21" : 1256774117,
>>>>>>>         "15" : 1170591375,
>>>>>>>         "6" : 1185108169,
>>>>>>>         "24" : 1202342095,
>>>>>>>         "0" : 1165356330
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "my_kafka_topic" : {
>>>>>>>         "23" : 1206928043,
>>>>>>>         "8" : 1158516721,
>>>>>>>         "17" : 1258389219,
>>>>>>>         "11" : 1263093490,
>>>>>>>         "2" : 1226743225,
>>>>>>>         "20" : 1229562962,
>>>>>>>         "5" : 1170307882,
>>>>>>>         "14" : 1207335736,
>>>>>>>         "4" : 1274245585,
>>>>>>>         "13" : 1336388570,
>>>>>>>         "22" : 1260213582,
>>>>>>>         "7" : 1288641384,
>>>>>>>         "16" : 1247464311,
>>>>>>>         "10" : 1093159186,
>>>>>>>         "1" : 1219906407,
>>>>>>>         "19" : 1116271435,
>>>>>>>         "9" : 1238936994,
>>>>>>>         "18" : 1069226913,
>>>>>>>         "12" : 1256020926,
>>>>>>>         "3" : 1251152579,
>>>>>>>         "21" : 1256776910,
>>>>>>>         "15" : 1170593216,
>>>>>>>         "6" : 1185110032,
>>>>>>>         "24" : 1202344538,
>>>>>>>         "0" : 1165358262
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : 
>>>>>>> "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> In the above StreamingQueryProgress event the numInputRows fields
>>>>>>>  is zero and this is the case for all micro batch executions and no 
>>>>>>> data is
>>>>>>> being produced whatsoever. So basically for each batch my offsets are 
>>>>>>> being
>>>>>>> reset and each batch is producing zero rows. Since there is no work 
>>>>>>> being
>>>>>>> done and since dynamic allocation is enabled all my executors killed... 
>>>>>>> I
>>>>>>> have tried deleting my checkpoint and started my application from 
>>>>>>> scratch
>>>>>>> and I am still facing the same issue. What could possibly be wrong 
>>>>>>> this?...
>>>>>>> what lines of investigation should I take?  If you are interested in
>>>>>>> getting Stackoverflow point you can answer my question in SO here
>>>>>>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Charles
>>>>>>>
>>>>>>>
>>>>>> --
>>>> Sent from Gmail Mobile
>>>>
>>>

Reply via email to