This is an automated email from the ASF dual-hosted git repository. changchen pushed a commit to branch fix/11550-system-property-trait in repository https://gitbox.apache.org/repos/asf/gluten.git
commit b6002648e82b851eb460bdc891508217e69983a7 Author: Chang chen <[email protected]> AuthorDate: Wed Apr 8 16:44:11 2026 +0000 [GLUTEN-11550][UT] Add debug testGluten for JobTagging CI flaky test Exclude original 'Tags set from session are prefixed with session UUID' test and add a testGluten version with logError diagnostics to debug the CI-only 'head of empty list' failure. The testGluten logs: - Main/Future thread identity and managedJobTags state - InheritableThreadLocal tag inheritance status - Actual job tags observed via SparkListener - cancelJobsWithTagWithFuture result Local run passes (5/5). CI failure root cause still under investigation. Co-authored-by: Copilot <[email protected]> --- .../gluten/utils/velox/VeloxTestSettings.scala | 1 + ...parkSessionJobTaggingAndCancellationSuite.scala | 94 +++++++++++++++++++++- .../gluten/utils/velox/VeloxTestSettings.scala | 1 + ...parkSessionJobTaggingAndCancellationSuite.scala | 94 +++++++++++++++++++++- 4 files changed, 188 insertions(+), 2 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index e444bf46e0..a70ec62871 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -816,6 +816,7 @@ class VeloxTestSettings extends BackendTestSettings { // TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure enableSuite[GlutenSparkSessionBuilderSuite] enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite] + .exclude("Tags set from session are prefixed with session UUID") enableSuite[GlutenTPCDSCollationQueryTestSuite] enableSuite[GlutenTPCDSModifiedPlanStabilitySuite] enableSuite[GlutenTPCDSModifiedPlanStabilityWithStatsSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala index d24641fb1f..a1f336e56d 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala @@ -16,6 +16,98 @@ */ package org.apache.spark.sql +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.util.ThreadUtils + +import java.util.concurrent.{Semaphore, TimeUnit} + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ + class GlutenSparkSessionJobTaggingAndCancellationSuite extends SparkSessionJobTaggingAndCancellationSuite - with GlutenTestSetWithSystemPropertyTrait {} + with GlutenTestSetWithSystemPropertyTrait + with Logging { + + testGluten("GLUTEN-DEBUG Tags set from session are prefixed with session UUID") { + sc = new SparkContext("local[2]", "test") + val session = classic.SparkSession.builder().sparkContext(sc).getOrCreate() + import session.implicits._ + + val mainThread = Thread.currentThread() + logError( + s"[GLUTEN-DEBUG] Main thread: ${mainThread.getName} " + + s"(id=${mainThread.getId}, class=${mainThread.getClass.getName})") + + val sem = new Semaphore(0) + val jobTags = new java.util.concurrent.ConcurrentHashMap[Int, String]() + sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val tags = Option(jobStart.properties.getProperty(SparkContext.SPARK_JOB_TAGS)) + .getOrElse("<null>") + jobTags.put(jobStart.jobId, tags) + logError(s"[GLUTEN-DEBUG] onJobStart: jobId=${jobStart.jobId}, tags=$tags") + sem.release() + } + }) + + session.addTag("one") + val mainManagedTags = session.managedJobTags.get().toMap + val mainThreadUuid = session.threadUuid.get() + logError(s"[GLUTEN-DEBUG] Main thread managedJobTags: $mainManagedTags") + logError(s"[GLUTEN-DEBUG] Main thread threadUuid: $mainThreadUuid") + + Future { + val futureThread = Thread.currentThread() + val futureManagedTags = session.managedJobTags.get().toMap + val futureThreadUuid = session.threadUuid.get() + logError( + s"[GLUTEN-DEBUG] Future thread: ${futureThread.getName} " + + s"(id=${futureThread.getId}, class=${futureThread.getClass.getName})") + logError(s"[GLUTEN-DEBUG] Future thread managedJobTags: $futureManagedTags") + logError(s"[GLUTEN-DEBUG] Future thread threadUuid: $futureThreadUuid") + logError( + s"[GLUTEN-DEBUG] Future thread inherited tag 'one': " + + s"${futureManagedTags.contains("one")}") + + session.range(1, 10000).map { i => Thread.sleep(100); i }.count() + }(ExecutionContext.global) + + val acquired = sem.tryAcquire(1, 1, TimeUnit.MINUTES) + logError(s"[GLUTEN-DEBUG] Semaphore acquired: $acquired") + logError(s"[GLUTEN-DEBUG] All job tags observed: $jobTags") + + val realTag = session.managedJobTags.get()("one") + logError(s"[GLUTEN-DEBUG] Looking for tag: $realTag") + + val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(realTag, "reason") + val cancelledJobs = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds) + logError(s"[GLUTEN-DEBUG] Cancelled jobs count: ${cancelledJobs.size}") + cancelledJobs.foreach(job => logError(s"[GLUTEN-DEBUG] Cancelled job: ${job.jobId}")) + + assert( + cancelledJobs.nonEmpty, + s"Expected at least one cancelled job. " + + s"mainTags=$mainManagedTags, realTag=$realTag, observedJobTags=$jobTags") + + val activeJob = cancelledJobs.head + val actualTags = activeJob.properties + .getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) + assert( + actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-thread-${session.threadUuid.get()}-one", + SQLExecution.executionIdJobTag( + session, + activeJob.properties + .get(SQLExecution.EXECUTION_ROOT_ID_KEY) + .asInstanceOf[String] + .toLong) + )) + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index c11d8da3b5..5a31f1d6f6 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -800,6 +800,7 @@ class VeloxTestSettings extends BackendTestSettings { // TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure enableSuite[GlutenSparkSessionBuilderSuite] enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite] + .exclude("Tags set from session are prefixed with session UUID") enableSuite[GlutenTPCDSCollationQueryTestSuite] enableSuite[GlutenTPCDSModifiedPlanStabilitySuite] enableSuite[GlutenTPCDSModifiedPlanStabilityWithStatsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala index d24641fb1f..a1f336e56d 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala @@ -16,6 +16,98 @@ */ package org.apache.spark.sql +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.util.ThreadUtils + +import java.util.concurrent.{Semaphore, TimeUnit} + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ + class GlutenSparkSessionJobTaggingAndCancellationSuite extends SparkSessionJobTaggingAndCancellationSuite - with GlutenTestSetWithSystemPropertyTrait {} + with GlutenTestSetWithSystemPropertyTrait + with Logging { + + testGluten("GLUTEN-DEBUG Tags set from session are prefixed with session UUID") { + sc = new SparkContext("local[2]", "test") + val session = classic.SparkSession.builder().sparkContext(sc).getOrCreate() + import session.implicits._ + + val mainThread = Thread.currentThread() + logError( + s"[GLUTEN-DEBUG] Main thread: ${mainThread.getName} " + + s"(id=${mainThread.getId}, class=${mainThread.getClass.getName})") + + val sem = new Semaphore(0) + val jobTags = new java.util.concurrent.ConcurrentHashMap[Int, String]() + sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val tags = Option(jobStart.properties.getProperty(SparkContext.SPARK_JOB_TAGS)) + .getOrElse("<null>") + jobTags.put(jobStart.jobId, tags) + logError(s"[GLUTEN-DEBUG] onJobStart: jobId=${jobStart.jobId}, tags=$tags") + sem.release() + } + }) + + session.addTag("one") + val mainManagedTags = session.managedJobTags.get().toMap + val mainThreadUuid = session.threadUuid.get() + logError(s"[GLUTEN-DEBUG] Main thread managedJobTags: $mainManagedTags") + logError(s"[GLUTEN-DEBUG] Main thread threadUuid: $mainThreadUuid") + + Future { + val futureThread = Thread.currentThread() + val futureManagedTags = session.managedJobTags.get().toMap + val futureThreadUuid = session.threadUuid.get() + logError( + s"[GLUTEN-DEBUG] Future thread: ${futureThread.getName} " + + s"(id=${futureThread.getId}, class=${futureThread.getClass.getName})") + logError(s"[GLUTEN-DEBUG] Future thread managedJobTags: $futureManagedTags") + logError(s"[GLUTEN-DEBUG] Future thread threadUuid: $futureThreadUuid") + logError( + s"[GLUTEN-DEBUG] Future thread inherited tag 'one': " + + s"${futureManagedTags.contains("one")}") + + session.range(1, 10000).map { i => Thread.sleep(100); i }.count() + }(ExecutionContext.global) + + val acquired = sem.tryAcquire(1, 1, TimeUnit.MINUTES) + logError(s"[GLUTEN-DEBUG] Semaphore acquired: $acquired") + logError(s"[GLUTEN-DEBUG] All job tags observed: $jobTags") + + val realTag = session.managedJobTags.get()("one") + logError(s"[GLUTEN-DEBUG] Looking for tag: $realTag") + + val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(realTag, "reason") + val cancelledJobs = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds) + logError(s"[GLUTEN-DEBUG] Cancelled jobs count: ${cancelledJobs.size}") + cancelledJobs.foreach(job => logError(s"[GLUTEN-DEBUG] Cancelled job: ${job.jobId}")) + + assert( + cancelledJobs.nonEmpty, + s"Expected at least one cancelled job. " + + s"mainTags=$mainManagedTags, realTag=$realTag, observedJobTags=$jobTags") + + val activeJob = cancelledJobs.head + val actualTags = activeJob.properties + .getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) + assert( + actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-thread-${session.threadUuid.get()}-one", + SQLExecution.executionIdJobTag( + session, + activeJob.properties + .get(SQLExecution.EXECUTION_ROOT_ID_KEY) + .asInstanceOf[String] + .toLong) + )) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
