[ 
https://issues.apache.org/jira/browse/SPARK-32044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongwei Zhu updated SPARK-32044:
---------------------------------
    Summary: [SS] 2.4 Kafka continuous processing print mislead initial offsets 
log   (was: [SS] 2.4 Kakfa continuous processing print mislead initial offsets 
log )

> [SS] 2.4 Kafka continuous processing print mislead initial offsets log 
> -----------------------------------------------------------------------
>
>                 Key: SPARK-32044
>                 URL: https://issues.apache.org/jira/browse/SPARK-32044
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.6
>            Reporter: Zhongwei Zhu
>            Priority: Trivial
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When using structured streaming in continuous processing mode, after restart 
> spark job, spark job can correctly pick up offsets in checkpoint location 
> from last epoch. But it always print out below log:
> 20/06/12 00:58:09 INFO [stream execution thread for [id = 
> 34e5b909-f9fe-422a-89c0-081251a68693, runId = 
> 0246e19d-aaa1-4a5c-9091-bab1a0578a0a]] kafka010.KafkaContinuousReader: 
> Initial offsets: 
> \{"kafka_topic":{"8":51618236,"11":51610655,"2":51622889,"5":51637171,"14":51637346,"13":51627784,"4":51606960,"7":51632475,"1":51636129,"10":51632212,"9":51634107,"3":51611013,"12":51626567,"15":51640774,"6":51637823,"0":51629106}}
> This log is misleading as spark didn't use this one as initial offsets. Also, 
> it results in unnecessary kafka offset fetch. This is caused by below code in 
> KafkaContinuousReader
> {code:java}
> offset = start.orElse {
>   val offsets = initialOffsets match {
>     case EarliestOffsetRangeLimit =>
>       KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
>     case LatestOffsetRangeLimit =>
>       KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
>     case SpecificOffsetRangeLimit(p) =>
>       offsetReader.fetchSpecificOffsets(p, reportDataLoss)
>   }
>   logInfo(s"Initial offsets: $offsets")
>   offsets
> }
> {code}
>  The code inside orElse block is always executed even when start has value.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to