Emanuele Sabellico created SPARK-27218:
------------------------------------------

             Summary: spark-sql-kafka-0-10 startingOffset=earliest not working 
as expected with streaming
                 Key: SPARK-27218
                 URL: https://issues.apache.org/jira/browse/SPARK-27218
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.0
         Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
            Reporter: Emanuele Sabellico


Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
with a code like this:
{noformat}
spark.readStream
.format("kafka")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.select(from_avro_with_schema_registry($"value", avroOptions) as 
"body"){noformat}
I find that Spark doesn't start from the earliest offset but from the latest. 
Or better, initially it gets the earliest offsets but then it does a seek to 
end, skipping the messages in between.
 In the logs I find this:
{noformat}
2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
{"test1":{"0":1740}}
2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
 Resetting offset for partition test1-0 to offset 15922.
{noformat}
Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the 
method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a 
_consumer.seekToEnd(partitions)_

According to the documentation I was expecting that the streaming started from 
the earliest offset in this case. Is there something that I'm getting wrong or 
doing wrong?

Thanks in advance!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to