This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch fix/11550-system-property-trait
in repository https://gitbox.apache.org/repos/asf/gluten.git

commit b6002648e82b851eb460bdc891508217e69983a7
Author: Chang chen <[email protected]>
AuthorDate: Wed Apr 8 16:44:11 2026 +0000

    [GLUTEN-11550][UT] Add debug testGluten for JobTagging CI flaky test
    
    Exclude original 'Tags set from session are prefixed with session UUID'
    test and add a testGluten version with logError diagnostics to debug
    the CI-only 'head of empty list' failure.
    
    The testGluten logs:
    - Main/Future thread identity and managedJobTags state
    - InheritableThreadLocal tag inheritance status
    - Actual job tags observed via SparkListener
    - cancelJobsWithTagWithFuture result
    
    Local run passes (5/5). CI failure root cause still under investigation.
    
    Co-authored-by: Copilot <[email protected]>
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |  1 +
 ...parkSessionJobTaggingAndCancellationSuite.scala | 94 +++++++++++++++++++++-
 .../gluten/utils/velox/VeloxTestSettings.scala     |  1 +
 ...parkSessionJobTaggingAndCancellationSuite.scala | 94 +++++++++++++++++++++-
 4 files changed, 188 insertions(+), 2 deletions(-)

diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index e444bf46e0..a70ec62871 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -816,6 +816,7 @@ class VeloxTestSettings extends BackendTestSettings {
   // TODO: 4.x enableSuite[GlutenSetCommandSuite]  // 1 failure
   enableSuite[GlutenSparkSessionBuilderSuite]
   enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite]
+    .exclude("Tags set from session are prefixed with session UUID")
   enableSuite[GlutenTPCDSCollationQueryTestSuite]
   enableSuite[GlutenTPCDSModifiedPlanStabilitySuite]
   enableSuite[GlutenTPCDSModifiedPlanStabilityWithStatsSuite]
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
index d24641fb1f..a1f336e56d 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
@@ -16,6 +16,98 @@
  */
 package org.apache.spark.sql
 
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.util.ThreadUtils
+
+import java.util.concurrent.{Semaphore, TimeUnit}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
+
 class GlutenSparkSessionJobTaggingAndCancellationSuite
   extends SparkSessionJobTaggingAndCancellationSuite
-  with GlutenTestSetWithSystemPropertyTrait {}
+  with GlutenTestSetWithSystemPropertyTrait
+  with Logging {
+
+  testGluten("GLUTEN-DEBUG Tags set from session are prefixed with session 
UUID") {
+    sc = new SparkContext("local[2]", "test")
+    val session = classic.SparkSession.builder().sparkContext(sc).getOrCreate()
+    import session.implicits._
+
+    val mainThread = Thread.currentThread()
+    logError(
+      s"[GLUTEN-DEBUG] Main thread: ${mainThread.getName} " +
+        s"(id=${mainThread.getId}, class=${mainThread.getClass.getName})")
+
+    val sem = new Semaphore(0)
+    val jobTags = new java.util.concurrent.ConcurrentHashMap[Int, String]()
+    sc.addSparkListener(new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        val tags = 
Option(jobStart.properties.getProperty(SparkContext.SPARK_JOB_TAGS))
+          .getOrElse("<null>")
+        jobTags.put(jobStart.jobId, tags)
+        logError(s"[GLUTEN-DEBUG] onJobStart: jobId=${jobStart.jobId}, 
tags=$tags")
+        sem.release()
+      }
+    })
+
+    session.addTag("one")
+    val mainManagedTags = session.managedJobTags.get().toMap
+    val mainThreadUuid = session.threadUuid.get()
+    logError(s"[GLUTEN-DEBUG] Main thread managedJobTags: $mainManagedTags")
+    logError(s"[GLUTEN-DEBUG] Main thread threadUuid: $mainThreadUuid")
+
+    Future {
+      val futureThread = Thread.currentThread()
+      val futureManagedTags = session.managedJobTags.get().toMap
+      val futureThreadUuid = session.threadUuid.get()
+      logError(
+        s"[GLUTEN-DEBUG] Future thread: ${futureThread.getName} " +
+          s"(id=${futureThread.getId}, 
class=${futureThread.getClass.getName})")
+      logError(s"[GLUTEN-DEBUG] Future thread managedJobTags: 
$futureManagedTags")
+      logError(s"[GLUTEN-DEBUG] Future thread threadUuid: $futureThreadUuid")
+      logError(
+        s"[GLUTEN-DEBUG] Future thread inherited tag 'one': " +
+          s"${futureManagedTags.contains("one")}")
+
+      session.range(1, 10000).map { i => Thread.sleep(100); i }.count()
+    }(ExecutionContext.global)
+
+    val acquired = sem.tryAcquire(1, 1, TimeUnit.MINUTES)
+    logError(s"[GLUTEN-DEBUG] Semaphore acquired: $acquired")
+    logError(s"[GLUTEN-DEBUG] All job tags observed: $jobTags")
+
+    val realTag = session.managedJobTags.get()("one")
+    logError(s"[GLUTEN-DEBUG] Looking for tag: $realTag")
+
+    val activeJobsFuture =
+      session.sparkContext.cancelJobsWithTagWithFuture(realTag, "reason")
+    val cancelledJobs = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds)
+    logError(s"[GLUTEN-DEBUG] Cancelled jobs count: ${cancelledJobs.size}")
+    cancelledJobs.foreach(job => logError(s"[GLUTEN-DEBUG] Cancelled job: 
${job.jobId}"))
+
+    assert(
+      cancelledJobs.nonEmpty,
+      s"Expected at least one cancelled job. " +
+        s"mainTags=$mainManagedTags, realTag=$realTag, 
observedJobTags=$jobTags")
+
+    val activeJob = cancelledJobs.head
+    val actualTags = activeJob.properties
+      .getProperty(SparkContext.SPARK_JOB_TAGS)
+      .split(SparkContext.SPARK_JOB_TAGS_SEP)
+    assert(
+      actualTags.toSet == Set(
+        session.sessionJobTag,
+        s"${session.sessionJobTag}-thread-${session.threadUuid.get()}-one",
+        SQLExecution.executionIdJobTag(
+          session,
+          activeJob.properties
+            .get(SQLExecution.EXECUTION_ROOT_ID_KEY)
+            .asInstanceOf[String]
+            .toLong)
+      ))
+  }
+}
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index c11d8da3b5..5a31f1d6f6 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -800,6 +800,7 @@ class VeloxTestSettings extends BackendTestSettings {
   // TODO: 4.x enableSuite[GlutenSetCommandSuite]  // 1 failure
   enableSuite[GlutenSparkSessionBuilderSuite]
   enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite]
+    .exclude("Tags set from session are prefixed with session UUID")
   enableSuite[GlutenTPCDSCollationQueryTestSuite]
   enableSuite[GlutenTPCDSModifiedPlanStabilitySuite]
   enableSuite[GlutenTPCDSModifiedPlanStabilityWithStatsSuite]
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
index d24641fb1f..a1f336e56d 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala
@@ -16,6 +16,98 @@
  */
 package org.apache.spark.sql
 
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.util.ThreadUtils
+
+import java.util.concurrent.{Semaphore, TimeUnit}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
+
 class GlutenSparkSessionJobTaggingAndCancellationSuite
   extends SparkSessionJobTaggingAndCancellationSuite
-  with GlutenTestSetWithSystemPropertyTrait {}
+  with GlutenTestSetWithSystemPropertyTrait
+  with Logging {
+
+  testGluten("GLUTEN-DEBUG Tags set from session are prefixed with session 
UUID") {
+    sc = new SparkContext("local[2]", "test")
+    val session = classic.SparkSession.builder().sparkContext(sc).getOrCreate()
+    import session.implicits._
+
+    val mainThread = Thread.currentThread()
+    logError(
+      s"[GLUTEN-DEBUG] Main thread: ${mainThread.getName} " +
+        s"(id=${mainThread.getId}, class=${mainThread.getClass.getName})")
+
+    val sem = new Semaphore(0)
+    val jobTags = new java.util.concurrent.ConcurrentHashMap[Int, String]()
+    sc.addSparkListener(new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        val tags = 
Option(jobStart.properties.getProperty(SparkContext.SPARK_JOB_TAGS))
+          .getOrElse("<null>")
+        jobTags.put(jobStart.jobId, tags)
+        logError(s"[GLUTEN-DEBUG] onJobStart: jobId=${jobStart.jobId}, 
tags=$tags")
+        sem.release()
+      }
+    })
+
+    session.addTag("one")
+    val mainManagedTags = session.managedJobTags.get().toMap
+    val mainThreadUuid = session.threadUuid.get()
+    logError(s"[GLUTEN-DEBUG] Main thread managedJobTags: $mainManagedTags")
+    logError(s"[GLUTEN-DEBUG] Main thread threadUuid: $mainThreadUuid")
+
+    Future {
+      val futureThread = Thread.currentThread()
+      val futureManagedTags = session.managedJobTags.get().toMap
+      val futureThreadUuid = session.threadUuid.get()
+      logError(
+        s"[GLUTEN-DEBUG] Future thread: ${futureThread.getName} " +
+          s"(id=${futureThread.getId}, 
class=${futureThread.getClass.getName})")
+      logError(s"[GLUTEN-DEBUG] Future thread managedJobTags: 
$futureManagedTags")
+      logError(s"[GLUTEN-DEBUG] Future thread threadUuid: $futureThreadUuid")
+      logError(
+        s"[GLUTEN-DEBUG] Future thread inherited tag 'one': " +
+          s"${futureManagedTags.contains("one")}")
+
+      session.range(1, 10000).map { i => Thread.sleep(100); i }.count()
+    }(ExecutionContext.global)
+
+    val acquired = sem.tryAcquire(1, 1, TimeUnit.MINUTES)
+    logError(s"[GLUTEN-DEBUG] Semaphore acquired: $acquired")
+    logError(s"[GLUTEN-DEBUG] All job tags observed: $jobTags")
+
+    val realTag = session.managedJobTags.get()("one")
+    logError(s"[GLUTEN-DEBUG] Looking for tag: $realTag")
+
+    val activeJobsFuture =
+      session.sparkContext.cancelJobsWithTagWithFuture(realTag, "reason")
+    val cancelledJobs = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds)
+    logError(s"[GLUTEN-DEBUG] Cancelled jobs count: ${cancelledJobs.size}")
+    cancelledJobs.foreach(job => logError(s"[GLUTEN-DEBUG] Cancelled job: 
${job.jobId}"))
+
+    assert(
+      cancelledJobs.nonEmpty,
+      s"Expected at least one cancelled job. " +
+        s"mainTags=$mainManagedTags, realTag=$realTag, 
observedJobTags=$jobTags")
+
+    val activeJob = cancelledJobs.head
+    val actualTags = activeJob.properties
+      .getProperty(SparkContext.SPARK_JOB_TAGS)
+      .split(SparkContext.SPARK_JOB_TAGS_SEP)
+    assert(
+      actualTags.toSet == Set(
+        session.sessionJobTag,
+        s"${session.sessionJobTag}-thread-${session.threadUuid.get()}-one",
+        SQLExecution.executionIdJobTag(
+          session,
+          activeJob.properties
+            .get(SQLExecution.EXECUTION_ROOT_ID_KEY)
+            .asInstanceOf[String]
+            .toLong)
+      ))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to