[ 
https://issues.apache.org/jira/browse/SPARK-40734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17620167#comment-17620167
 ] 

Yang Jie commented on SPARK-40734:
----------------------------------

It looks like flaky test, 

re-run will succeed, but the inducement for failure has not been found yet

 
{code:java}
- ensure stream-stream self-join generates only one offset in log and correct 
metrics *** FAILED ***
  Timed out waiting for stream: The code passed to failAfter did not complete 
within 30 seconds.
  java.base/java.lang.Thread.getStackTrace(Thread.java:2550)
        org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)
        org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
        
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479)
        
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:478)
        scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
        scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
        scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
  
        Caused by:      null
        
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1766)
                
org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:465)
                
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:480)
                
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
                
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
                
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
                
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
                
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
                
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
                
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479)
  
  
  == Progress ==
     AssertOnQuery(<condition>, )
     AddKafkaData(topics = Set(topic-51), data = WrappedArray(1, 2), message = )
  => CheckAnswer: [1,1,1],[2,2,2]
     AddKafkaData(topics = Set(topic-51), data = WrappedArray(6, 3), message = )
     CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6]
     AssertOnQuery(<condition>, )
  
  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[Subscribe[topic-51]]: {"topic-51":{"1":0,"0":1}}}
  Thread state: alive
  Thread stack trace: java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
  java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:319)
  java.base/java.lang.ProcessImpl.start(ProcessImpl.java:249)
  java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
  java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
  org.apache.hadoop.util.Shell.runCommand(Shell.java:937)
  org.apache.hadoop.util.Shell.run(Shell.java:900)
  org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
  
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
  
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
  
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
  
org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
  org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
  org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
  org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
  
org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
  
org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:360)
  org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
  org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
  org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
  org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
  org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
  org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
  
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:359)
  
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
  
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
  
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:365)
  
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  scala.Option.getOrElse(Option.scala:189)
  
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
  
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
  
org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233)
  
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227)
  
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
  
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
  
  
  == Sink ==
  0: 
  1: [1,1,1]
  
  
  == Plan ==
  == Parsed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, 
Append, 1
  +- Project [key#71341, value#71340, value#71344]
     +- Join Inner, (key#71341 = key#71345)
        :- Project [cast(cast(value#71327 as string) as int) AS value#71340, 
(cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :  +- StreamingDataSourceV2Relation [key#71326, value#71327, 
topic#71328, partition#71329, offset#71330L, timestamp#71331, 
timestampType#71332], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, 
KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, 
{"topic-51":{"1":0,"0":1}}
        +- Project [cast(cast(value#71327 as string) as int) AS value#71344, 
(cast(cast(value#71327 as string) as int) % 5) AS key#71345]
           +- StreamingDataSourceV2Relation [key#71326, value#71327, 
topic#71328, partition#71329, offset#71330L, timestamp#71331, 
timestampType#71332], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, 
KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, 
{"topic-51":{"1":0,"0":1}}
  
  == Analyzed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, 
Append, 1
  +- Project [key#71341, value#71340, value#71344]
     +- Join Inner, (key#71341 = key#71345)
        :- Project [cast(cast(value#71327 as string) as int) AS value#71340, 
(cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :  +- StreamingDataSourceV2Relation [key#71326, value#71327, 
topic#71328, partition#71329, offset#71330L, timestamp#71331, 
timestampType#71332], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, 
KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, 
{"topic-51":{"1":0,"0":1}}
        +- Project [cast(cast(value#71327 as string) as int) AS value#71344, 
(cast(cast(value#71327 as string) as int) % 5) AS key#71345]
           +- StreamingDataSourceV2Relation [key#71326, value#71327, 
topic#71328, partition#71329, offset#71330L, timestamp#71331, 
timestampType#71332], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, 
KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, 
{"topic-51":{"1":0,"0":1}}
  
  == Optimized Logical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379]
  +- Project [key#71341, value#71340, value#71344]
     +- Join Inner, (key#71341 = key#71345)
        :- Project [cast(cast(value#71327 as string) as int) AS value#71340, 
(cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :  +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
        :     +- StreamingDataSourceV2Relation [key#71326, value#71327, 
topic#71328, partition#71329, offset#71330L, timestamp#71331, 
timestampType#71332], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, 
KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, 
{"topic-51":{"1":0,"0":1}}
        +- Project [cast(cast(value#71327 as string) as int) AS value#71344, 
(cast(cast(value#71327 as string) as int) % 5) AS key#71345]
           +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
              +- StreamingDataSourceV2Relation [key#71326, value#71327, 
topic#71328, partition#71329, offset#71330L, timestamp#71331, 
timestampType#71332], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, 
KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, 
{"topic-51":{"1":0,"0":1}}
  
  == Physical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@3079e1be
  +- *(3) Project [key#71341, value#71340, value#71344]
     +- StreamingSymmetricHashJoin [key#71341], [key#71345], Inner, condition = 
[ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = 
file:/Users/yangjie01/SourceCode/git/spark-mine-12/connector/kafka-0-10-sql/target/tmp/streaming.metadata-35e5abc4-7986-478a-a06d-a5029055402f/state,
 runId = 8f8665b7-883b-4dae-9f2d-972f08219a59, opId = 0, ver = 1, numPartitions 
= 5], 0, state cleanup [ left = null, right = null ], 2
        :- Exchange hashpartitioning(key#71341, 5), ENSURE_REQUIREMENTS, 
[plan_id=108033]
        :  +- *(1) Project [cast(cast(value#71327 as string) as int) AS 
value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :     +- *(1) Filter isnotnull((cast(cast(value#71327 as string) as 
int) % 5))
        :        +- MicroBatchScan[key#71326, value#71327, topic#71328, 
partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan
        +- Exchange hashpartitioning(key#71345, 5), ENSURE_REQUIREMENTS, 
[plan_id=108038]
           +- *(2) Project [cast(cast(value#71327 as string) as int) AS 
value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
              +- *(2) Filter isnotnull((cast(cast(value#71327 as string) as 
int) % 5))
                 +- MicroBatchScan[key#71326, value#71327, topic#71328, 
partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan 
(StreamTest.scala:460)
- read Kafka transactional messages: read_committed *** FAILED ***
  Assert on query failed: Execute: The code passed to eventually never returned 
normally. Attempted 1694 times over 30.010655291000003 seconds. Last failure 
message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
  org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
        org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
        org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
        org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
        
org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:53)
        
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$88(KafkaMicroBatchSourceSuite.scala:1063)
        
org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4(StreamTest.scala:303)
        
org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4$adapted(StreamTest.scala:303)
        
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$41(StreamTest.scala:661)
        org.apache.spark.sql.streaming.StreamTest.verify$1(StreamTest.scala:430)
  
        Caused by:      clock.isStreamWaitingAt(clock.getTimeMillis()) was false
        
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
                
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
                
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
                
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
                
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$89(KafkaMicroBatchSourceSuite.scala:1065)
                
org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
                
org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
                
org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
                
org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
                
org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
  
  
  == Progress ==
     
StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@35e37a97,Map(),null)
     AssertOnQuery(<condition>, Execute)
     CheckAnswer: 
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
  => AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [0],[1],[2]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [3],[4]
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [12],[13],[14]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [15],[16]
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [18],[20]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [22],[23]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
  
  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[Subscribe[topic-52]]: {"topic-52":{"0":0}}}
  Thread state: alive
  Thread stack trace: java.base/java.io.FileInputStream.readBytes(Native Method)
  java.base/java.io.FileInputStream.read(FileInputStream.java:293)
  java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:308)
  java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:382)
  java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:367)
  java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:333)
  java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:376)
  java.base/sun.nio.cs.StreamDecoder.lockedRead(StreamDecoder.java:219)
  java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:173)
  java.base/java.io.InputStreamReader.read(InputStreamReader.java:189)
  java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
  java.base/java.io.BufferedReader.read1(BufferedReader.java:226)
  java.base/java.io.BufferedReader.implRead(BufferedReader.java:315)
  java.base/java.io.BufferedReader.read(BufferedReader.java:297)
  
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:1225)
  org.apache.hadoop.util.Shell.runCommand(Shell.java:993)
  org.apache.hadoop.util.Shell.run(Shell.java:900)
  org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
  org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
  
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
  
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
  
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
  org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1590)
  
org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
  
org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
  org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
  org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:496)
  org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
  org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
  
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:370)
  
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
  
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:176)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  scala.Option.getOrElse(Option.scala:189)
  
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
  
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
  
org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233)
  
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227)
  
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
  
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
  
  
  == Sink ==
  0: 
  
  
  == Plan ==
  == Parsed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, 
Append, 0
  +- SerializeFromObject [input[0, int, false] AS value#71573]
     +- MapElements 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#71572: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: 
scala.Tuple2
           +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 
as string) AS value#71562]
              +- StreamingDataSourceV2Relation [key#71547, value#71548, 
topic#71549, partition#71550, offset#71551L, timestamp#71552, 
timestampType#71553], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, 
KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
  
  == Analyzed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, 
Append, 0
  +- SerializeFromObject [input[0, int, false] AS value#71573]
     +- MapElements 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#71572: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: 
scala.Tuple2
           +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 
as string) AS value#71562]
              +- StreamingDataSourceV2Relation [key#71547, value#71548, 
topic#71549, partition#71550, offset#71551L, timestamp#71552, 
timestampType#71553], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, 
KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
  
  == Optimized Logical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59]
  +- SerializeFromObject [input[0, int, false] AS value#71573]
     +- MapElements 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#71572: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: 
scala.Tuple2
           +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 
as string) AS value#71562]
              +- StreamingDataSourceV2Relation [key#71547, value#71548, 
topic#71549, partition#71550, offset#71551L, timestamp#71552, 
timestampType#71553], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, 
KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
  
  == Physical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@670a07bf
  +- *(1) SerializeFromObject [input[0, int, false] AS value#71573]
     +- *(1) MapElements 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed,
 obj#71572: int
        +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#71571: 
scala.Tuple2
           +- *(1) Project [cast(key#71547 as string) AS key#71561, 
cast(value#71548 as string) AS value#71562]
              +- MicroBatchScan[key#71547, value#71548, topic#71549, 
partition#71550, offset#71551L, timestamp#71552, timestampType#71553] class 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan 
(StreamTest.scala:460) {code}
 

 

> KafkaMicroBatchV2SourceWithAdminSuite failed
> --------------------------------------------
>
>                 Key: SPARK-40734
>                 URL: https://issues.apache.org/jira/browse/SPARK-40734
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 3.4.0
>            Reporter: Yang Jie
>            Priority: Minor
>
> - ensure stream-stream self-join generates only one offset in log and correct 
> metrics *** FAILED ***
> - read Kafka transactional messages: read_committed *** FAILED ***
> Failure reason to be supplemented



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to