This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 52fd8c01cc8b [SPARK-47595][STREAMING] Streaming: Migrate logError with 
variables to structured logging framework
52fd8c01cc8b is described below

commit 52fd8c01cc8b2a6ce1db3e059b0b962d258f4342
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Wed Apr 10 15:21:13 2024 -0700

    [SPARK-47595][STREAMING] Streaming: Migrate logError with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate `logError` in module `Streaming` with variables to 
`structured logging framework`.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45910 from panbingkun/SPARK-47595.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala   | 2 ++
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala     | 8 +++++---
 .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala  | 8 +++++---
 .../apache/spark/streaming/scheduler/ReceivedBlockTracker.scala   | 6 ++++--
 .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala    | 6 ++++--
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala  | 5 +++--
 .../test/scala/org/apache/spark/streaming/MasterFailureTest.scala | 5 +++--
 7 files changed, 26 insertions(+), 14 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 6cdec011e2ae..a9a79de05c27 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -121,6 +121,7 @@ object LogKey extends Enumeration {
   val RANGE = Value
   val RDD_ID = Value
   val REASON = Value
+  val RECEIVED_BLOCK_INFO = Value
   val REDUCE_ID = Value
   val RELATION_NAME = Value
   val REMAINING_PARTITIONS = Value
@@ -143,6 +144,7 @@ object LogKey extends Enumeration {
   val STAGE_ID = Value
   val STATEMENT_ID = Value
   val STATUS = Value
+  val STREAM_ID = Value
   val STREAM_NAME = Value
   val SUBMISSION_ID = Value
   val SUBSAMPLING_RATE = Value
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 414fdf5d619d..e301311c922a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
+import org.apache.spark.internal.LogKey.PATH
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.scheduler.StreamInputInfo
@@ -288,9 +290,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
       }
       if (rdd.partitions.isEmpty) {
-        logError("File " + file + " has no data in it. Spark Streaming can 
only ingest " +
-          "files that have been \"moved\" to the directory assigned to the 
file stream. " +
-          "Refer to the streaming programming guide for more details.")
+        logError(log"File ${MDC(PATH, file)} has no data in it. Spark 
Streaming can only ingest " +
+          log"""files that have been "moved" to the directory assigned to the 
file stream. """ +
+          log"Refer to the streaming programming guide for more details.")
       }
       rdd
     }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 672452a4af4f..15f346484864 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -25,7 +25,8 @@ import scala.concurrent._
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, STREAM_ID}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.util.{ThreadUtils, Utils}
 
@@ -175,7 +176,8 @@ private[streaming] abstract class ReceiverSupervisor(
       }
     } catch {
       case NonFatal(t) =>
-        logError(s"Error stopping receiver $streamId 
${Utils.exceptionString(t)}")
+        logError(log"Error stopping receiver ${MDC(STREAM_ID, streamId)} " +
+          log"${MDC(ERROR, Utils.exceptionString(t))}")
     }
   }
 
@@ -218,7 +220,7 @@ private[streaming] abstract class ReceiverSupervisor(
     logInfo("Waiting for receiver to be stopped")
     stopLatch.await()
     if (stoppingError != null) {
-      logError("Stopped receiver with error: " + stoppingError)
+      logError(log"Stopped receiver with error: ${MDC(ERROR, stoppingError)}")
       throw stoppingError
     } else {
       logInfo("Stopped receiver without error")
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index a14112e47382..bdb910337241 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.RECEIVED_BLOCK_INFO
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.streaming.Time
 import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
@@ -99,7 +100,8 @@ private[streaming] class ReceivedBlockTracker(
       writeResult
     } catch {
       case NonFatal(e) =>
-        logError(s"Error adding block $receivedBlockInfo", e)
+        logError(
+          log"Error adding block ${MDC(RECEIVED_BLOCK_INFO, 
receivedBlockInfo)}", e)
         false
     }
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 52ea8fdfbfef..81c6264234f4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -24,7 +24,8 @@ import scala.concurrent.ExecutionContext
 import scala.util.{Failure, Success}
 
 import org.apache.spark._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, STREAM_ID}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
@@ -330,7 +331,8 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
     } else {
       s"$message"
     }
-    logError(s"Deregistered receiver for stream $streamId: $messageWithError")
+    logError(log"Deregistered receiver for stream ${MDC(STREAM_ID, streamId)}: 
" +
+      log"${MDC(ERROR, messageWithError)}")
   }
 
   /** Update a receiver's maximum ingestion rate */
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index a2e29a1cfa00..3d93f045a5ec 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.RETRY_COUNT
 import org.apache.spark.util.{CompletionIterator, ThreadUtils}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -106,7 +107,7 @@ private[streaming] class FileBasedWriteAheadLog(
       }
     }
     if (fileSegment == null) {
-      logError(s"Failed to write to write ahead log after $failures failures")
+      logError(log"Failed to write to write ahead log after ${MDC(RETRY_COUNT, 
failures)} failures")
       throw lastException
     }
     fileSegment
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 0805f76fd36f..afe17936043a 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.scalatest.Assertions._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
 
@@ -392,7 +393,7 @@ class FileGeneratingThread(input: Seq[String], testDir: 
Path, interval: Long)
           }
         }
         if (!done) {
-          logError("Could not generate file " + hadoopFile)
+          logError(log"Could not generate file ${MDC(PATH, hadoopFile)}")
         } else {
           logInfo("Generated file " + hadoopFile + " at " + 
System.currentTimeMillis)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to