Zhongwei Zhu created SPARK-32044:
------------------------------------

             Summary: Kakfa continuous processing print mislead initial offsets 
log 
                 Key: SPARK-32044
                 URL: https://issues.apache.org/jira/browse/SPARK-32044
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.6
            Reporter: Zhongwei Zhu
             Fix For: 2.4.7


When using structured streaming in continuous processing mode, after restart 
spark job, spark job can correctly pick up offsets in checkpoint location from 
last epoch. But it always print out below log:

20/06/12 00:58:09 INFO [stream execution thread for [id = 
34e5b909-f9fe-422a-89c0-081251a68693, runId = 
0246e19d-aaa1-4a5c-9091-bab1a0578a0a]] kafka010.KafkaContinuousReader: Initial 
offsets: 
\{"kafka_topic":{"8":51618236,"11":51610655,"2":51622889,"5":51637171,"14":51637346,"13":51627784,"4":51606960,"7":51632475,"1":51636129,"10":51632212,"9":51634107,"3":51611013,"12":51626567,"15":51640774,"6":51637823,"0":51629106}}

 And this log is misleading, as spark didn't use this one as initial offsets. 
This is caused by below code in KafkaContinuousReader
{code:java}
offset = start.orElse {
  val offsets = initialOffsets match {
    case EarliestOffsetRangeLimit =>
      KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
    case LatestOffsetRangeLimit =>
      KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
    case SpecificOffsetRangeLimit(p) =>
      offsetReader.fetchSpecificOffsets(p, reportDataLoss)
  }
  logInfo(s"Initial offsets: $offsets")
  offsets
}
{code}
 The code inside orElse block is always executed even when start has value.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to