[ https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799009#comment-16799009 ]
Emanuele Sabellico edited comment on SPARK-27218 at 3/22/19 1:28 PM: --------------------------------------------------------------------- Hi Gabor, I'm using structured streaming and using load as stated here in the first example: [https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html] {code:java} val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() {code} the issue is about the first time the streaming is started, when the checkpoint has not been written. I'd like to start from the earliest offset of the topic, not from the latest, read the messages that are already in the topic and than continue with the streaming. The doc of _startingOffsets_ says {noformat} The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest. {noformat} I understand that earliest can be used with readStream, and in fact it doesn't stop me from using it, but it doesn't work. was (Author: emasab): Hi Gabor, I'm using structured streaming and using load as stated here in the first example: [https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html] {code:java} val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() {code} the issue is about the first time the streaming is started, when the checkpoint has not been written. I'd like to start from the earliest offset of the topic, not from the latest, read the messages that are already in the topic and than continue with the streaming. The doc says {noformat} The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest. {noformat} I understand that earliest can be used with readStream, and in fact it doesn't stop me from using it, but it doesn't work. > spark-sql-kafka-0-10 startingOffset=earliest not working as expected with > streaming > ----------------------------------------------------------------------------------- > > Key: SPARK-27218 > URL: https://issues.apache.org/jira/browse/SPARK-27218 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Environment: Windows 10, spark-2.4.0-bin-hadoop2.7 > Reporter: Emanuele Sabellico > Priority: Minor > > Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 > with a code like this: > {noformat} > spark.readStream > .format("kafka") > .option("subscribe", "test1") > .option("startingOffsets", "earliest") > .load(){noformat} > I find that Spark doesn't start from the earliest offset but from the latest. > Or better, initially it gets the earliest offsets but then it does a seek to > end, skipping the messages in-between. > In the logs I find this: > {noformat} > 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: > {"test1":{"0":1740}} > 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, > groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] > Resetting offset for partition test1-0 to offset 15922. > {noformat} > Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ > the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is > a _consumer.seekToEnd(partitions)_ > According to the documentation I was expecting that the streaming would have > started from the earliest offset in this case. Is there something that I'm > getting wrong or doing wrong? > Thanks in advance! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org