[ 
https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799009#comment-16799009
 ] 

Emanuele Sabellico edited comment on SPARK-27218 at 3/22/19 1:27 PM:
---------------------------------------------------------------------

Hi Gabor,

I'm using structured streaming and using load as stated here in the first 
example:

[https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html]
{code:java}
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
{code}
 

the issue is about the first time the streaming is started, when the checkpoint 
has not been written. I'd like to start from the earliest offset of the topic, 
not from the latest, read the messages that are already in the topic and than 
continue with the streaming.

The doc says
{noformat}
The start point when a query is started, either 
"earliest" which is from the earliest offsets, 
"latest" which is just from the latest offsets, 
or a json string specifying a starting offset 
for each TopicPartition. 
In the json, -2 as an offset can be used to refer 
to earliest, -1 to latest. Note: For batch queries, 
latest (either implicitly or by using -1 in json) 
is not allowed. For streaming queries, this only 
applies when a new query is started, and that 
resuming will always pick up from where the query 
left off. Newly discovered partitions during a 
query will start at earliest.
{noformat}
I understand that earliest can be used with readStream, and in fact it doesn't 
stop me from using it, but it doesn't work.

 


was (Author: emasab):
Hi Gabor,

I'm using structured streaming and using load as stated here in the first 
example:

[https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html]


{code:java}
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
{code}
 


the issue is about the first time the streaming is started, when the checkpoint 
has not been written. I'd like to start from the earliest offset of the topic, 
not from the latest, read the messages that are already in the topic and than 
continue with the streaming.

The doc says
{noformat}
The start point when a query is started, either "earliest" which is from the 
earliest offsets, "latest" which is just from the latest offsets, or a json 
string specifying a starting offset for each TopicPartition. In the json, -2 as 
an offset can be used to refer to earliest, -1 to latest. Note: For batch 
queries, latest (either implicitly or by using -1 in json) is not allowed. For 
streaming queries, this only applies when a new query is started, and that 
resuming will always pick up from where the query left off. Newly discovered 
partitions during a query will start at earliest.{noformat}
I understand that earliest can be used with readStream, and in fact it doesn't 
stop me from using it, but it doesn't work.

 

> spark-sql-kafka-0-10 startingOffset=earliest not working as expected with 
> streaming
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-27218
>                 URL: https://issues.apache.org/jira/browse/SPARK-27218
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
>            Reporter: Emanuele Sabellico
>            Priority: Minor
>
> Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
> with a code like this:
> {noformat}
> spark.readStream
> .format("kafka")
> .option("subscribe", "test1")
> .option("startingOffsets", "earliest")
> .load(){noformat}
> I find that Spark doesn't start from the earliest offset but from the latest. 
> Or better, initially it gets the earliest offsets but then it does a seek to 
> end, skipping the messages in-between.
>  In the logs I find this:
> {noformat}
> 2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
> {"test1":{"0":1740}}
> 2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
>  Resetting offset for partition test1-0 to offset 15922.
> {noformat}
> Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ 
> the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is 
> a _consumer.seekToEnd(partitions)_
> According to the documentation I was expecting that the streaming would have 
> started from the earliest offset in this case. Is there something that I'm 
> getting wrong or doing wrong?
> Thanks in advance!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to