Hey Charles,
If you are using maxOffsetsPerTrigger, you will likely rest the offsets
every microbatch, because:
 1. Spark will figure out a range of offsets to process (let's call them x
and y)
 2. If these offsets have fallen out of the retention period, Spark will
try to set the offset to x which is less than z > y > x.
 3. Since z > y, Spark will not process any of the data
 4. Goto 1

On Wed, Sep 11, 2019, 6:09 PM Charles vinodh <mig.flan...@gmail.com> wrote:

> Hi Sandish,
>
> as I have said if the offset reset happens only once that would make
> sense. But I am not sure how to explain why the offset reset is happening
> for every micro-batch...
> ideally once the offset reset happens the app should move to a valid
> offset and start consuming data. but in my case for every batch the offset
> is getting reset and no data is ever getting generated.
>
> Thanks,
> Charles
>
> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN <sanysand...@gmail.com>
> wrote:
>
>> You can see this kind of error, if there is consumer lag more than Kafka
>> retention period.
>> You will not see any failures if below option is not set.
>>
>> Set failOnDataLoss=true option to see failures.
>>
>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh <mig.flan...@gmail.com>
>> wrote:
>>
>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>>> *fetch.message.max.bytes. *
>>>
>>> *"*may be that you are trying to process records that have passed the
>>> retention period within Kafka.*"*
>>> If the above is true then I should have my offsets reset only once
>>> ideally when my application starts. But mu offsets are resetting for every
>>> batch. if my application is using offsets that are no longer available in
>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>> next request made to Kafka should provide proper data. But in case for all
>>> micro-batches the offsets are getting reseted and the batch is producing no
>>> data.
>>>
>>>
>>>
>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> Do you have rate limiting set on your stream? It may be that you are
>>>> trying to process records that have passed the retention period within
>>>> Kafka.
>>>>
>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to run a spark application ingesting data from Kafka using
>>>>> the Spark structured streaming and the spark library
>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>>>> issue where during execution of all my micro-batches the Kafka consumer is
>>>>> not able to fetch the offsets and its having its offsets reset as show
>>>>> below in this log
>>>>>
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>>>
>>>>>
>>>>> It is reasonable if this resetting happens once in application due to
>>>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>>>> will have to reset our offsets to a new value. But I am seeing this reset
>>>>> happening for every micro batch execution in my streaming job. In at the
>>>>> end the streaming query progress emits the following
>>>>>
>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>>>>> progress: {
>>>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>>>   "name" : null,
>>>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>>>   "batchId" : 189,
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "addBatch" : 127,
>>>>>     "getBatch" : 0,
>>>>>     "getEndOffset" : 0,
>>>>>     "queryPlanning" : 24,
>>>>>     "setOffsetRange" : 36,
>>>>>     "triggerExecution" : 1859,
>>>>>     "walCommit" : 1032
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>>>>     "startOffset" : {
>>>>>       "my_kafka_topic" : {
>>>>>         "23" : 1206926686,
>>>>>         "8" : 1158514946,
>>>>>         "17" : 1258387219,
>>>>>         "11" : 1263091642,
>>>>>         "2" : 1226741128,
>>>>>         "20" : 1229560889,
>>>>>         "5" : 1170304913,
>>>>>         "14" : 1207333901,
>>>>>         "4" : 1274242728,
>>>>>         "13" : 1336386658,
>>>>>         "22" : 1260210993,
>>>>>         "7" : 1288639296,
>>>>>         "16" : 1247462229,
>>>>>         "10" : 1093157103,
>>>>>         "1" : 1219904858,
>>>>>         "19" : 1116269615,
>>>>>         "9" : 1238935018,
>>>>>         "18" : 1069224544,
>>>>>         "12" : 1256018541,
>>>>>         "3" : 1251150202,
>>>>>         "21" : 1256774117,
>>>>>         "15" : 1170591375,
>>>>>         "6" : 1185108169,
>>>>>         "24" : 1202342095,
>>>>>         "0" : 1165356330
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "my_kafka_topic" : {
>>>>>         "23" : 1206928043,
>>>>>         "8" : 1158516721,
>>>>>         "17" : 1258389219,
>>>>>         "11" : 1263093490,
>>>>>         "2" : 1226743225,
>>>>>         "20" : 1229562962,
>>>>>         "5" : 1170307882,
>>>>>         "14" : 1207335736,
>>>>>         "4" : 1274245585,
>>>>>         "13" : 1336388570,
>>>>>         "22" : 1260213582,
>>>>>         "7" : 1288641384,
>>>>>         "16" : 1247464311,
>>>>>         "10" : 1093159186,
>>>>>         "1" : 1219906407,
>>>>>         "19" : 1116271435,
>>>>>         "9" : 1238936994,
>>>>>         "18" : 1069226913,
>>>>>         "12" : 1256020926,
>>>>>         "3" : 1251152579,
>>>>>         "21" : 1256776910,
>>>>>         "15" : 1170593216,
>>>>>         "6" : 1185110032,
>>>>>         "24" : 1202344538,
>>>>>         "0" : 1165358262
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : 
>>>>> "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> In the above StreamingQueryProgress event the numInputRows fields  is
>>>>> zero and this is the case for all micro batch executions and no data is
>>>>> being produced whatsoever. So basically for each batch my offsets are 
>>>>> being
>>>>> reset and each batch is producing zero rows. Since there is no work being
>>>>> done and since dynamic allocation is enabled all my executors killed... I
>>>>> have tried deleting my checkpoint and started my application from scratch
>>>>> and I am still facing the same issue. What could possibly be wrong 
>>>>> this?...
>>>>> what lines of investigation should I take?  If you are interested in
>>>>> getting Stackoverflow point you can answer my question in SO here
>>>>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Charles
>>>>>
>>>>>
>>>> --
>> Sent from Gmail Mobile
>>
>

Reply via email to