[ https://issues.apache.org/jira/browse/SPARK-40734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17620167#comment-17620167 ]
Yang Jie commented on SPARK-40734: ---------------------------------- It looks like flaky test, re-run will succeed, but the inducement for failure has not been found yet {code:java} - ensure stream-stream self-join generates only one offset in log and correct metrics *** FAILED *** Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds. java.base/java.lang.Thread.getStackTrace(Thread.java:2550) org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:478) scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) Caused by: null java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1766) org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:465) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:480) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479) == Progress == AssertOnQuery(<condition>, ) AddKafkaData(topics = Set(topic-51), data = WrappedArray(1, 2), message = ) => CheckAnswer: [1,1,1],[2,2,2] AddKafkaData(topics = Set(topic-51), data = WrappedArray(6, 3), message = ) CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6] AssertOnQuery(<condition>, ) == Stream == Output Mode: Append Stream state: {KafkaV2[Subscribe[topic-51]]: {"topic-51":{"1":0,"0":1}}} Thread state: alive Thread stack trace: java.base/java.lang.ProcessImpl.forkAndExec(Native Method) java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:319) java.base/java.lang.ProcessImpl.start(ProcessImpl.java:249) java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110) java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073) org.apache.hadoop.util.Shell.runCommand(Shell.java:937) org.apache.hadoop.util.Shell.run(Shell.java:900) org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212) org.apache.hadoop.util.Shell.execCommand(Shell.java:1306) org.apache.hadoop.util.Shell.execCommand(Shell.java:1288) org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978) org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324) org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294) org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439) org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428) org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459) org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305) org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102) org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:360) org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400) org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626) org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701) org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697) org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) org.apache.hadoop.fs.FileContext.create(FileContext.java:703) org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:359) org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140) org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143) org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:365) org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173) scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) scala.Option.getOrElse(Option.scala:189) org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171) org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116) org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507) scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726) org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227) org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) == Sink == 0: 1: [1,1,1] == Plan == == Parsed Logical Plan == WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, Append, 1 +- Project [key#71341, value#71340, value#71344] +- Join Inner, (key#71341 = key#71345) :- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341] : +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}} +- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345] +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}} == Analyzed Logical Plan == WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, Append, 1 +- Project [key#71341, value#71340, value#71344] +- Join Inner, (key#71341 = key#71345) :- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341] : +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}} +- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345] +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}} == Optimized Logical Plan == WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379] +- Project [key#71341, value#71340, value#71344] +- Join Inner, (key#71341 = key#71345) :- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341] : +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5)) : +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}} +- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345] +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5)) +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}} == Physical Plan == WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@3079e1be +- *(3) Project [key#71341, value#71340, value#71344] +- StreamingSymmetricHashJoin [key#71341], [key#71345], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/Users/yangjie01/SourceCode/git/spark-mine-12/connector/kafka-0-10-sql/target/tmp/streaming.metadata-35e5abc4-7986-478a-a06d-a5029055402f/state, runId = 8f8665b7-883b-4dae-9f2d-972f08219a59, opId = 0, ver = 1, numPartitions = 5], 0, state cleanup [ left = null, right = null ], 2 :- Exchange hashpartitioning(key#71341, 5), ENSURE_REQUIREMENTS, [plan_id=108033] : +- *(1) Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341] : +- *(1) Filter isnotnull((cast(cast(value#71327 as string) as int) % 5)) : +- MicroBatchScan[key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan +- Exchange hashpartitioning(key#71345, 5), ENSURE_REQUIREMENTS, [plan_id=108038] +- *(2) Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345] +- *(2) Filter isnotnull((cast(cast(value#71327 as string) as int) % 5)) +- MicroBatchScan[key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:460) - read Kafka transactional messages: read_committed *** FAILED *** Assert on query failed: Execute: The code passed to eventually never returned normally. Attempted 1694 times over 30.010655291000003 seconds. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false. org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219) org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226) org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348) org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347) org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:53) org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$88(KafkaMicroBatchSourceSuite.scala:1063) org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4(StreamTest.scala:303) org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4$adapted(StreamTest.scala:303) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$41(StreamTest.scala:661) org.apache.spark.sql.streaming.StreamTest.verify$1(StreamTest.scala:430) Caused by: clock.isStreamWaitingAt(clock.getTimeMillis()) was false org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$89(KafkaMicroBatchSourceSuite.scala:1065) org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184) org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196) org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226) org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348) org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347) == Progress == StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@35e37a97,Map(),null) AssertOnQuery(<condition>, Execute) CheckAnswer: AssertOnQuery(<condition>, Run Kafka Producer) AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: AssertOnQuery(<condition>, Run Kafka Producer) AdvanceManualClock(100) => AssertOnQuery(<condition>, Execute) CheckNewAnswer: [0],[1],[2] AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: [3],[4] AssertOnQuery(<condition>, Run Kafka Producer) AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: AssertOnQuery(<condition>, Run Kafka Producer) AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: [12],[13],[14] AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: [15],[16] AssertOnQuery(<condition>, Run Kafka Producer) AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: [18],[20] AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: [22],[23] AdvanceManualClock(100) AssertOnQuery(<condition>, Execute) CheckNewAnswer: == Stream == Output Mode: Append Stream state: {KafkaV2[Subscribe[topic-52]]: {"topic-52":{"0":0}}} Thread state: alive Thread stack trace: java.base/java.io.FileInputStream.readBytes(Native Method) java.base/java.io.FileInputStream.read(FileInputStream.java:293) java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:308) java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:382) java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:367) java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:333) java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:376) java.base/sun.nio.cs.StreamDecoder.lockedRead(StreamDecoder.java:219) java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:173) java.base/java.io.InputStreamReader.read(InputStreamReader.java:189) java.base/java.io.BufferedReader.fill(BufferedReader.java:161) java.base/java.io.BufferedReader.read1(BufferedReader.java:226) java.base/java.io.BufferedReader.implRead(BufferedReader.java:315) java.base/java.io.BufferedReader.read(BufferedReader.java:297) org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:1225) org.apache.hadoop.util.Shell.runCommand(Shell.java:993) org.apache.hadoop.util.Shell.run(Shell.java:900) org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212) org.apache.hadoop.util.Shell.execCommand(Shell.java:1306) org.apache.hadoop.util.Shell.execCommand(Shell.java:1288) org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212) org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113) org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102) org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073) org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1590) org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206) org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790) org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:496) org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036) org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:370) org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154) org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:176) scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) scala.Option.getOrElse(Option.scala:189) org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171) org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116) org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507) scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726) org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227) org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) == Sink == 0: == Plan == == Parsed Logical Plan == WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, Append, 0 +- SerializeFromObject [input[0, int, false] AS value#71573] +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2 +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562] +- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}} == Analyzed Logical Plan == WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, Append, 0 +- SerializeFromObject [input[0, int, false] AS value#71573] +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2 +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562] +- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}} == Optimized Logical Plan == WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59] +- SerializeFromObject [input[0, int, false] AS value#71573] +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2 +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562] +- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}} == Physical Plan == WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@670a07bf +- *(1) SerializeFromObject [input[0, int, false] AS value#71573] +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, obj#71572: int +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2 +- *(1) Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562] +- MicroBatchScan[key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:460) {code} > KafkaMicroBatchV2SourceWithAdminSuite failed > -------------------------------------------- > > Key: SPARK-40734 > URL: https://issues.apache.org/jira/browse/SPARK-40734 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming > Affects Versions: 3.4.0 > Reporter: Yang Jie > Priority: Minor > > - ensure stream-stream self-join generates only one offset in log and correct > metrics *** FAILED *** > - read Kafka transactional messages: read_committed *** FAILED *** > Failure reason to be supplemented -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org