[ 
https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798988#comment-16798988
 ] 

Gabor Somogyi commented on SPARK-27218:
---------------------------------------

[~emasab] Don't fully understand your test code:
* When readStream API used start should be called.
* When read API used load should be called.

All in all are you using batch or streaming?

If streaming used, startingOffsets has effect only when the query first time 
started.
Later on either from checkpoint location the latest processed offset will be 
restored.
Of course if the checkpoint location is temporary then this doesn't apply.


> spark-sql-kafka-0-10 startingOffset=earliest not working as expected with 
> streaming
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-27218
>                 URL: https://issues.apache.org/jira/browse/SPARK-27218
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
>            Reporter: Emanuele Sabellico
>            Priority: Minor
>
> Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
> with a code like this:
> {noformat}
> spark.readStream
> .format("kafka")
> .option("subscribe", "test1")
> .option("startingOffsets", "earliest")
> .load(){noformat}
> I find that Spark doesn't start from the earliest offset but from the latest. 
> Or better, initially it gets the earliest offsets but then it does a seek to 
> end, skipping the messages in-between.
>  In the logs I find this:
> {noformat}
> 2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
> {"test1":{"0":1740}}
> 2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
>  Resetting offset for partition test1-0 to offset 15922.
> {noformat}
> Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ 
> the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is 
> a _consumer.seekToEnd(partitions)_
> According to the documentation I was expecting that the streaming would have 
> started from the earliest offset in this case. Is there something that I'm 
> getting wrong or doing wrong?
> Thanks in advance!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to