[ https://issues.apache.org/jira/browse/SPARK-32044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-32044: ------------------------------------ Assignee: Apache Spark > [SS] Kakfa continuous processing print mislead initial offsets log > ------------------------------------------------------------------- > > Key: SPARK-32044 > URL: https://issues.apache.org/jira/browse/SPARK-32044 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.6 > Reporter: Zhongwei Zhu > Assignee: Apache Spark > Priority: Trivial > Fix For: 2.4.7 > > Original Estimate: 24h > Remaining Estimate: 24h > > When using structured streaming in continuous processing mode, after restart > spark job, spark job can correctly pick up offsets in checkpoint location > from last epoch. But it always print out below log: > 20/06/12 00:58:09 INFO [stream execution thread for [id = > 34e5b909-f9fe-422a-89c0-081251a68693, runId = > 0246e19d-aaa1-4a5c-9091-bab1a0578a0a]] kafka010.KafkaContinuousReader: > Initial offsets: > \{"kafka_topic":{"8":51618236,"11":51610655,"2":51622889,"5":51637171,"14":51637346,"13":51627784,"4":51606960,"7":51632475,"1":51636129,"10":51632212,"9":51634107,"3":51611013,"12":51626567,"15":51640774,"6":51637823,"0":51629106}} > And this log is misleading, as spark didn't use this one as initial offsets. > This is caused by below code in KafkaContinuousReader > {code:java} > offset = start.orElse { > val offsets = initialOffsets match { > case EarliestOffsetRangeLimit => > KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) > case LatestOffsetRangeLimit => > KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) > case SpecificOffsetRangeLimit(p) => > offsetReader.fetchSpecificOffsets(p, reportDataLoss) > } > logInfo(s"Initial offsets: $offsets") > offsets > } > {code} > The code inside orElse block is always executed even when start has value. > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org