Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Thanks Dhaval, that fixed the issue. The constant resetting of Kafka
offsets misled me about the issue. Please feel free the answer the SO
question here

if
you would like to..





On Wed, Sep 11, 2019 at 9:03 PM Dhaval Patel 
wrote:

> Hi Charles,
>
> Can you check is any of the case related to output directory and
> checkpoint location mentioned in below link is applicable in your case?
>
> https://kb.databricks.com/streaming/file-sink-streaming.html
>
> Regards
> Dhaval
>
> On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz  wrote:
>
>> Hey Charles,
>> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
>> every microbatch, because:
>>  1. Spark will figure out a range of offsets to process (let's call them
>> x and y)
>>  2. If these offsets have fallen out of the retention period, Spark will
>> try to set the offset to x which is less than z > y > x.
>>  3. Since z > y, Spark will not process any of the data
>>  4. Goto 1
>>
>> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh 
>> wrote:
>>
>>> Hi Sandish,
>>>
>>> as I have said if the offset reset happens only once that would make
>>> sense. But I am not sure how to explain why the offset reset is happening
>>> for every micro-batch...
>>> ideally once the offset reset happens the app should move to a valid
>>> offset and start consuming data. but in my case for every batch the offset
>>> is getting reset and no data is ever getting generated.
>>>
>>> Thanks,
>>> Charles
>>>
>>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
>>> wrote:
>>>
 You can see this kind of error, if there is consumer lag more than
 Kafka retention period.
 You will not see any failures if below option is not set.

 Set failOnDataLoss=true option to see failures.

 On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
 wrote:

> The only form of rate limiting I have set is *maxOffsetsPerTrigger *
> and *fetch.message.max.bytes. *
>
> *"*may be that you are trying to process records that have passed the
> retention period within Kafka.*"*
> If the above is true then I should have my offsets reset only once
> ideally when my application starts. But mu offsets are resetting for every
> batch. if my application is using offsets that are no longer available in
> Kafka it will reset to earliest or latest offset available in Kafka and 
> the
> next request made to Kafka should provide proper data. But in case for all
> micro-batches the offsets are getting reseted and the batch is producing 
> no
> data.
>
>
>
> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>
>> Do you have rate limiting set on your stream? It may be that you are
>> trying to process records that have passed the retention period within
>> Kafka.
>>
>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to run a spark application ingesting data from Kafka
>>> using the Spark structured streaming and the spark library
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>>> weird
>>> issue where during execution of all my micro-batches the Kafka consumer 
>>> is
>>> not able to fetch the offsets and its having its offsets reset as show
>>> below in this log
>>>
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>
>>>
>>> It is reasonable if this resetting happens once in application due
>>> to the fact that the offsets stored in my checkpoint are no longer valid
>>> and will have to reset our offsets to a new value. But I am seeing this
>>> reset happening for every micro batch execution in my streaming job. In 
>>> at
>>> the end the streaming query progress emits the following
>>>
>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>>> progress: {
>>>   "id" : 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Dhaval Patel
Hi Charles,

Can you check is any of the case related to output directory and checkpoint
location mentioned in below link is applicable in your case?

https://kb.databricks.com/streaming/file-sink-streaming.html

Regards
Dhaval

On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz  wrote:

> Hey Charles,
> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
> every microbatch, because:
>  1. Spark will figure out a range of offsets to process (let's call them x
> and y)
>  2. If these offsets have fallen out of the retention period, Spark will
> try to set the offset to x which is less than z > y > x.
>  3. Since z > y, Spark will not process any of the data
>  4. Goto 1
>
> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh 
> wrote:
>
>> Hi Sandish,
>>
>> as I have said if the offset reset happens only once that would make
>> sense. But I am not sure how to explain why the offset reset is happening
>> for every micro-batch...
>> ideally once the offset reset happens the app should move to a valid
>> offset and start consuming data. but in my case for every batch the offset
>> is getting reset and no data is ever getting generated.
>>
>> Thanks,
>> Charles
>>
>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
>> wrote:
>>
>>> You can see this kind of error, if there is consumer lag more than Kafka
>>> retention period.
>>> You will not see any failures if below option is not set.
>>>
>>> Set failOnDataLoss=true option to see failures.
>>>
>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>>> wrote:
>>>
 The only form of rate limiting I have set is *maxOffsetsPerTrigger *
 and *fetch.message.max.bytes. *

 *"*may be that you are trying to process records that have passed the
 retention period within Kafka.*"*
 If the above is true then I should have my offsets reset only once
 ideally when my application starts. But mu offsets are resetting for every
 batch. if my application is using offsets that are no longer available in
 Kafka it will reset to earliest or latest offset available in Kafka and the
 next request made to Kafka should provide proper data. But in case for all
 micro-batches the offsets are getting reseted and the batch is producing no
 data.



 On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:

> Do you have rate limiting set on your stream? It may be that you are
> trying to process records that have passed the retention period within
> Kafka.
>
> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
> wrote:
>
>>
>> Hi,
>>
>> I am trying to run a spark application ingesting data from Kafka
>> using the Spark structured streaming and the spark library
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>> weird
>> issue where during execution of all my micro-batches the Kafka consumer 
>> is
>> not able to fetch the offsets and its having its offsets reset as show
>> below in this log
>>
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>
>>
>> It is reasonable if this resetting happens once in application due to
>> the fact that the offsets stored in my checkpoint are no longer valid and
>> will have to reset our offsets to a new value. But I am seeing this reset
>> happening for every micro batch execution in my streaming job. In at the
>> end the streaming query progress emits the following
>>
>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>> progress: {
>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>   "name" : null,
>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>   "batchId" : 189,
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "addBatch" : 127,
>> "getBatch" : 0,
>> "getEndOffset" : 0,
>> "queryPlanning" : 24,
>> "setOffsetRange" : 36,
>> 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Hey Charles,
If you are using maxOffsetsPerTrigger, you will likely rest the offsets
every microbatch, because:
 1. Spark will figure out a range of offsets to process (let's call them x
and y)
 2. If these offsets have fallen out of the retention period, Spark will
try to set the offset to x which is less than z > y > x.
 3. Since z > y, Spark will not process any of the data
 4. Goto 1

On Wed, Sep 11, 2019, 6:09 PM Charles vinodh  wrote:

> Hi Sandish,
>
> as I have said if the offset reset happens only once that would make
> sense. But I am not sure how to explain why the offset reset is happening
> for every micro-batch...
> ideally once the offset reset happens the app should move to a valid
> offset and start consuming data. but in my case for every batch the offset
> is getting reset and no data is ever getting generated.
>
> Thanks,
> Charles
>
> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
> wrote:
>
>> You can see this kind of error, if there is consumer lag more than Kafka
>> retention period.
>> You will not see any failures if below option is not set.
>>
>> Set failOnDataLoss=true option to see failures.
>>
>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>> wrote:
>>
>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>>> *fetch.message.max.bytes. *
>>>
>>> *"*may be that you are trying to process records that have passed the
>>> retention period within Kafka.*"*
>>> If the above is true then I should have my offsets reset only once
>>> ideally when my application starts. But mu offsets are resetting for every
>>> batch. if my application is using offsets that are no longer available in
>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>> next request made to Kafka should provide proper data. But in case for all
>>> micro-batches the offsets are getting reseted and the batch is producing no
>>> data.
>>>
>>>
>>>
>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>>
 Do you have rate limiting set on your stream? It may be that you are
 trying to process records that have passed the retention period within
 Kafka.

 On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
 wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using
> the Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to
> the fact that the offsets stored in my checkpoint are no longer valid and
> will have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
> progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 127,
> "getBatch" : 0,
> "getEndOffset" : 0,
> "queryPlanning" : 24,
> "setOffsetRange" : 36,
> "triggerExecution" : 1859,
> "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
> "startOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206926686,
> "8" : 1158514946,
> "17" : 1258387219,
> "11" : 1263091642,
> "2" : 1226741128,
> "20" : 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Hi Sandish,

as I have said if the offset reset happens only once that would make sense.
But I am not sure how to explain why the offset reset is happening for
every micro-batch...
ideally once the offset reset happens the app should move to a valid offset
and start consuming data. but in my case for every batch the offset is
getting reset and no data is ever getting generated.

Thanks,
Charles

On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
wrote:

> You can see this kind of error, if there is consumer lag more than Kafka
> retention period.
> You will not see any failures if below option is not set.
>
> Set failOnDataLoss=true option to see failures.
>
> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
> wrote:
>
>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>> *fetch.message.max.bytes. *
>>
>> *"*may be that you are trying to process records that have passed the
>> retention period within Kafka.*"*
>> If the above is true then I should have my offsets reset only once
>> ideally when my application starts. But mu offsets are resetting for every
>> batch. if my application is using offsets that are no longer available in
>> Kafka it will reset to earliest or latest offset available in Kafka and the
>> next request made to Kafka should provide proper data. But in case for all
>> micro-batches the offsets are getting reseted and the batch is producing no
>> data.
>>
>>
>>
>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>
>>> Do you have rate limiting set on your stream? It may be that you are
>>> trying to process records that have passed the retention period within
>>> Kafka.
>>>
>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>> wrote:
>>>

 Hi,

 I am trying to run a spark application ingesting data from Kafka using
 the Spark structured streaming and the spark library
 org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
 issue where during execution of all my micro-batches the Kafka consumer is
 not able to fetch the offsets and its having its offsets reset as show
 below in this log

 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-5 to offset 1168959116.
 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-1 to offset 1218619371.
 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-8 to offset 1157205346.
 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-21 to offset 1255403059.


 It is reasonable if this resetting happens once in application due to
 the fact that the offsets stored in my checkpoint are no longer valid and
 will have to reset our offsets to a new value. But I am seeing this reset
 happening for every micro batch execution in my streaming job. In at the
 end the streaming query progress emits the following

 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: 
 {
   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
   "name" : null,
   "timestamp" : "2019-09-10T15:55:00.000Z",
   "batchId" : 189,
   "numInputRows" : 0,
   "inputRowsPerSecond" : 0.0,
   "processedRowsPerSecond" : 0.0,
   "durationMs" : {
 "addBatch" : 127,
 "getBatch" : 0,
 "getEndOffset" : 0,
 "queryPlanning" : 24,
 "setOffsetRange" : 36,
 "triggerExecution" : 1859,
 "walCommit" : 1032
   },
   "stateOperators" : [ ],
   "sources" : [ {
 "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
 "startOffset" : {
   "my_kafka_topic" : {
 "23" : 1206926686,
 "8" : 1158514946,
 "17" : 1258387219,
 "11" : 1263091642,
 "2" : 1226741128,
 "20" : 1229560889,
 "5" : 1170304913,
 "14" : 1207333901,
 "4" : 1274242728,
 "13" : 1336386658,
 "22" : 1260210993,
 "7" : 1288639296,
 "16" : 1247462229,
 "10" : 1093157103,
 "1" : 1219904858,
 "19" : 1116269615,
 "9" : 1238935018,
 "18" : 1069224544,
 "12" : 1256018541,
 "3" : 1251150202,
 "21" : 1256774117,
 "15" : 1170591375,
 "6" : 1185108169,
 "24" : 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Sandish Kumar HN
You can see this kind of error, if there is consumer lag more than Kafka
retention period.
You will not see any failures if below option is not set.

Set failOnDataLoss=true option to see failures.

On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
wrote:

> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
> *fetch.message.max.bytes. *
>
> *"*may be that you are trying to process records that have passed the
> retention period within Kafka.*"*
> If the above is true then I should have my offsets reset only once ideally
> when my application starts. But mu offsets are resetting for every batch.
> if my application is using offsets that are no longer available in Kafka it
> will reset to earliest or latest offset available in Kafka and the
> next request made to Kafka should provide proper data. But in case for all
> micro-batches the offsets are getting reseted and the batch is producing no
> data.
>
>
>
> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>
>> Do you have rate limiting set on your stream? It may be that you are
>> trying to process records that have passed the retention period within
>> Kafka.
>>
>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to run a spark application ingesting data from Kafka using
>>> the Spark structured streaming and the spark library
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>> issue where during execution of all my micro-batches the Kafka consumer is
>>> not able to fetch the offsets and its having its offsets reset as show
>>> below in this log
>>>
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>
>>>
>>> It is reasonable if this resetting happens once in application due to
>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>> will have to reset our offsets to a new value. But I am seeing this reset
>>> happening for every micro batch execution in my streaming job. In at the
>>> end the streaming query progress emits the following
>>>
>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>   "name" : null,
>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>   "batchId" : 189,
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>> "addBatch" : 127,
>>> "getBatch" : 0,
>>> "getEndOffset" : 0,
>>> "queryPlanning" : 24,
>>> "setOffsetRange" : 36,
>>> "triggerExecution" : 1859,
>>> "walCommit" : 1032
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>> "startOffset" : {
>>>   "my_kafka_topic" : {
>>> "23" : 1206926686,
>>> "8" : 1158514946,
>>> "17" : 1258387219,
>>> "11" : 1263091642,
>>> "2" : 1226741128,
>>> "20" : 1229560889,
>>> "5" : 1170304913,
>>> "14" : 1207333901,
>>> "4" : 1274242728,
>>> "13" : 1336386658,
>>> "22" : 1260210993,
>>> "7" : 1288639296,
>>> "16" : 1247462229,
>>> "10" : 1093157103,
>>> "1" : 1219904858,
>>> "19" : 1116269615,
>>> "9" : 1238935018,
>>> "18" : 1069224544,
>>> "12" : 1256018541,
>>> "3" : 1251150202,
>>> "21" : 1256774117,
>>> "15" : 1170591375,
>>> "6" : 1185108169,
>>> "24" : 1202342095,
>>> "0" : 1165356330
>>>   }
>>> },
>>> "endOffset" : {
>>>   "my_kafka_topic" : {
>>> "23" : 1206928043,
>>> "8" : 1158516721,
>>> "17" : 1258389219,
>>> "11" : 1263093490,
>>> "2" : 1226743225,
>>> "20" : 1229562962,
>>> "5" : 1170307882,
>>> "14" : 1207335736,
>>> "4" : 1274245585,
>>> "13" : 1336388570,
>>> "22" : 1260213582,
>>> "7" : 1288641384,
>>> "16" : 1247464311,
>>> "10" : 1093159186,
>>> "1" : 1219906407,

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
*fetch.message.max.bytes. *

*"*may be that you are trying to process records that have passed the
retention period within Kafka.*"*
If the above is true then I should have my offsets reset only once ideally
when my application starts. But mu offsets are resetting for every batch.
if my application is using offsets that are no longer available in Kafka it
will reset to earliest or latest offset available in Kafka and the
next request made to Kafka should provide proper data. But in case for all
micro-batches the offsets are getting reseted and the batch is producing no
data.



On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:

> Do you have rate limiting set on your stream? It may be that you are
> trying to process records that have passed the retention period within
> Kafka.
>
> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
> wrote:
>
>>
>> Hi,
>>
>> I am trying to run a spark application ingesting data from Kafka using
>> the Spark structured streaming and the spark library
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>> issue where during execution of all my micro-batches the Kafka consumer is
>> not able to fetch the offsets and its having its offsets reset as show
>> below in this log
>>
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>
>>
>> It is reasonable if this resetting happens once in application due to the
>> fact that the offsets stored in my checkpoint are no longer valid and will
>> have to reset our offsets to a new value. But I am seeing this reset
>> happening for every micro batch execution in my streaming job. In at the
>> end the streaming query progress emits the following
>>
>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>   "name" : null,
>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>   "batchId" : 189,
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "addBatch" : 127,
>> "getBatch" : 0,
>> "getEndOffset" : 0,
>> "queryPlanning" : 24,
>> "setOffsetRange" : 36,
>> "triggerExecution" : 1859,
>> "walCommit" : 1032
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>> "startOffset" : {
>>   "my_kafka_topic" : {
>> "23" : 1206926686,
>> "8" : 1158514946,
>> "17" : 1258387219,
>> "11" : 1263091642,
>> "2" : 1226741128,
>> "20" : 1229560889,
>> "5" : 1170304913,
>> "14" : 1207333901,
>> "4" : 1274242728,
>> "13" : 1336386658,
>> "22" : 1260210993,
>> "7" : 1288639296,
>> "16" : 1247462229,
>> "10" : 1093157103,
>> "1" : 1219904858,
>> "19" : 1116269615,
>> "9" : 1238935018,
>> "18" : 1069224544,
>> "12" : 1256018541,
>> "3" : 1251150202,
>> "21" : 1256774117,
>> "15" : 1170591375,
>> "6" : 1185108169,
>> "24" : 1202342095,
>> "0" : 1165356330
>>   }
>> },
>> "endOffset" : {
>>   "my_kafka_topic" : {
>> "23" : 1206928043,
>> "8" : 1158516721,
>> "17" : 1258389219,
>> "11" : 1263093490,
>> "2" : 1226743225,
>> "20" : 1229562962,
>> "5" : 1170307882,
>> "14" : 1207335736,
>> "4" : 1274245585,
>> "13" : 1336388570,
>> "22" : 1260213582,
>> "7" : 1288641384,
>> "16" : 1247464311,
>> "10" : 1093159186,
>> "1" : 1219906407,
>> "19" : 1116271435,
>> "9" : 1238936994,
>> "18" : 1069226913,
>> "12" : 1256020926,
>> "3" : 1251152579,
>> "21" : 1256776910,
>> "15" : 1170593216,
>> "6" : 1185110032,
>> "24" : 1202344538,
>> "0" : 1165358262
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Do you have rate limiting set on your stream? It may be that you are trying
to process records that have passed the retention period within Kafka.

On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using the
> Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to the
> fact that the offsets stored in my checkpoint are no longer valid and will
> have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 127,
> "getBatch" : 0,
> "getEndOffset" : 0,
> "queryPlanning" : 24,
> "setOffsetRange" : 36,
> "triggerExecution" : 1859,
> "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
> "startOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206926686,
> "8" : 1158514946,
> "17" : 1258387219,
> "11" : 1263091642,
> "2" : 1226741128,
> "20" : 1229560889,
> "5" : 1170304913,
> "14" : 1207333901,
> "4" : 1274242728,
> "13" : 1336386658,
> "22" : 1260210993,
> "7" : 1288639296,
> "16" : 1247462229,
> "10" : 1093157103,
> "1" : 1219904858,
> "19" : 1116269615,
> "9" : 1238935018,
> "18" : 1069224544,
> "12" : 1256018541,
> "3" : 1251150202,
> "21" : 1256774117,
> "15" : 1170591375,
> "6" : 1185108169,
> "24" : 1202342095,
> "0" : 1165356330
>   }
> },
> "endOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206928043,
> "8" : 1158516721,
> "17" : 1258389219,
> "11" : 1263093490,
> "2" : 1226743225,
> "20" : 1229562962,
> "5" : 1170307882,
> "14" : 1207335736,
> "4" : 1274245585,
> "13" : 1336388570,
> "22" : 1260213582,
> "7" : 1288641384,
> "16" : 1247464311,
> "10" : 1093159186,
> "1" : 1219906407,
> "19" : 1116271435,
> "9" : 1238936994,
> "18" : 1069226913,
> "12" : 1256020926,
> "3" : 1251152579,
> "21" : 1256776910,
> "15" : 1170593216,
> "6" : 1185110032,
> "24" : 1202344538,
> "0" : 1165358262
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>   }
> }
>
>
> In the above StreamingQueryProgress event the numInputRows fields  is zero
> and this is the case for all micro batch executions and no data is being
> produced whatsoever. So basically for each batch my offsets are being reset
> and each batch is producing zero rows. Since there is no work being done
> and since dynamic allocation is enabled all my executors killed... I have
> tried deleting my checkpoint and started my application from scratch and I
> am still facing the same issue. What could possibly be wrong this?... what
> lines of investigation should I take?  If you are interested in getting
> Stackoverflow point you can answer my question in SO here
> 

Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Hi,

I am trying to run a spark application ingesting data from Kafka using the
Spark structured streaming and the spark library
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
issue where during execution of all my micro-batches the Kafka consumer is
not able to fetch the offsets and its having its offsets reset as show
below in this log

19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-21 to offset 1255403059.


It is reasonable if this resetting happens once in application due to the
fact that the offsets stored in my checkpoint are no longer valid and will
have to reset our offsets to a new value. But I am seeing this reset
happening for every micro batch execution in my streaming job. In at the
end the streaming query progress emits the following

19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
  "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
  "name" : null,
  "timestamp" : "2019-09-10T15:55:00.000Z",
  "batchId" : 189,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"addBatch" : 127,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 24,
"setOffsetRange" : 36,
"triggerExecution" : 1859,
"walCommit" : 1032
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaV2[Subscribe[my_kafka_topic]]",
"startOffset" : {
  "my_kafka_topic" : {
"23" : 1206926686,
"8" : 1158514946,
"17" : 1258387219,
"11" : 1263091642,
"2" : 1226741128,
"20" : 1229560889,
"5" : 1170304913,
"14" : 1207333901,
"4" : 1274242728,
"13" : 1336386658,
"22" : 1260210993,
"7" : 1288639296,
"16" : 1247462229,
"10" : 1093157103,
"1" : 1219904858,
"19" : 1116269615,
"9" : 1238935018,
"18" : 1069224544,
"12" : 1256018541,
"3" : 1251150202,
"21" : 1256774117,
"15" : 1170591375,
"6" : 1185108169,
"24" : 1202342095,
"0" : 1165356330
  }
},
"endOffset" : {
  "my_kafka_topic" : {
"23" : 1206928043,
"8" : 1158516721,
"17" : 1258389219,
"11" : 1263093490,
"2" : 1226743225,
"20" : 1229562962,
"5" : 1170307882,
"14" : 1207335736,
"4" : 1274245585,
"13" : 1336388570,
"22" : 1260213582,
"7" : 1288641384,
"16" : 1247464311,
"10" : 1093159186,
"1" : 1219906407,
"19" : 1116271435,
"9" : 1238936994,
"18" : 1069226913,
"12" : 1256020926,
"3" : 1251152579,
"21" : 1256776910,
"15" : 1170593216,
"6" : 1185110032,
"24" : 1202344538,
"0" : 1165358262
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
  }
}


In the above StreamingQueryProgress event the numInputRows fields  is zero
and this is the case for all micro batch executions and no data is being
produced whatsoever. So basically for each batch my offsets are being reset
and each batch is producing zero rows. Since there is no work being done
and since dynamic allocation is enabled all my executors killed... I have
tried deleting my checkpoint and started my application from scratch and I
am still facing the same issue. What could possibly be wrong this?... what
lines of investigation should I take?  If you are interested in getting
Stackoverflow point you can answer my question in SO here
.


Thanks,
Charles