This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 52fd8c01cc8b [SPARK-47595][STREAMING] Streaming: Migrate logError with variables to structured logging framework 52fd8c01cc8b is described below commit 52fd8c01cc8b2a6ce1db3e059b0b962d258f4342 Author: panbingkun <panbing...@baidu.com> AuthorDate: Wed Apr 10 15:21:13 2024 -0700 [SPARK-47595][STREAMING] Streaming: Migrate logError with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logError` in module `Streaming` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45910 from panbingkun/SPARK-47595. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala | 2 ++ .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 8 +++++--- .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala | 8 +++++--- .../apache/spark/streaming/scheduler/ReceivedBlockTracker.scala | 6 ++++-- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 6 ++++-- .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 5 +++-- .../test/scala/org/apache/spark/streaming/MasterFailureTest.scala | 5 +++-- 7 files changed, 26 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 6cdec011e2ae..a9a79de05c27 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -121,6 +121,7 @@ object LogKey extends Enumeration { val RANGE = Value val RDD_ID = Value val REASON = Value + val RECEIVED_BLOCK_INFO = Value val REDUCE_ID = Value val RELATION_NAME = Value val REMAINING_PARTITIONS = Value @@ -143,6 +144,7 @@ object LogKey extends Enumeration { val STAGE_ID = Value val STATEMENT_ID = Value val STATUS = Value + val STREAM_ID = Value val STREAM_NAME = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 414fdf5d619d..e301311c922a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.internal.LogKey.PATH +import org.apache.spark.internal.MDC import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo @@ -288,9 +290,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( case None => context.sparkContext.newAPIHadoopFile[K, V, F](file) } if (rdd.partitions.isEmpty) { - logError("File " + file + " has no data in it. Spark Streaming can only ingest " + - "files that have been \"moved\" to the directory assigned to the file stream. " + - "Refer to the streaming programming guide for more details.") + logError(log"File ${MDC(PATH, file)} has no data in it. Spark Streaming can only ingest " + + log"""files that have been "moved" to the directory assigned to the file stream. """ + + log"Refer to the streaming programming guide for more details.") } rdd } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 672452a4af4f..15f346484864 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -25,7 +25,8 @@ import scala.concurrent._ import scala.util.control.NonFatal import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, STREAM_ID} import org.apache.spark.storage.StreamBlockId import org.apache.spark.util.{ThreadUtils, Utils} @@ -175,7 +176,8 @@ private[streaming] abstract class ReceiverSupervisor( } } catch { case NonFatal(t) => - logError(s"Error stopping receiver $streamId ${Utils.exceptionString(t)}") + logError(log"Error stopping receiver ${MDC(STREAM_ID, streamId)} " + + log"${MDC(ERROR, Utils.exceptionString(t))}") } } @@ -218,7 +220,7 @@ private[streaming] abstract class ReceiverSupervisor( logInfo("Waiting for receiver to be stopped") stopLatch.await() if (stoppingError != null) { - logError("Stopped receiver with error: " + stoppingError) + logError(log"Stopped receiver with error: ${MDC(ERROR, stoppingError)}") throw stoppingError } else { logInfo("Stopped receiver without error") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index a14112e47382..bdb910337241 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.RECEIVED_BLOCK_INFO import org.apache.spark.network.util.JavaUtils import org.apache.spark.streaming.Time import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils} @@ -99,7 +100,8 @@ private[streaming] class ReceivedBlockTracker( writeResult } catch { case NonFatal(e) => - logError(s"Error adding block $receivedBlockInfo", e) + logError( + log"Error adding block ${MDC(RECEIVED_BLOCK_INFO, receivedBlockInfo)}", e) false } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 52ea8fdfbfef..81c6264234f4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -24,7 +24,8 @@ import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, STREAM_ID} import org.apache.spark.rdd.RDD import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation} @@ -330,7 +331,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } else { s"$message" } - logError(s"Deregistered receiver for stream $streamId: $messageWithError") + logError(log"Deregistered receiver for stream ${MDC(STREAM_ID, streamId)}: " + + log"${MDC(ERROR, messageWithError)}") } /** Update a receiver's maximum ingestion rate */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index a2e29a1cfa00..3d93f045a5ec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -31,7 +31,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.RETRY_COUNT import org.apache.spark.util.{CompletionIterator, ThreadUtils} import org.apache.spark.util.ArrayImplicits._ @@ -106,7 +107,7 @@ private[streaming] class FileBasedWriteAheadLog( } } if (fileSegment == null) { - logError(s"Failed to write to write ahead log after $failures failures") + logError(log"Failed to write to write ahead log after ${MDC(RETRY_COUNT, failures)} failures") throw lastException } fileSegment diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 0805f76fd36f..afe17936043a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -32,7 +32,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.Assertions._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils @@ -392,7 +393,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } } if (!done) { - logError("Could not generate file " + hadoopFile) + logError(log"Could not generate file ${MDC(PATH, hadoopFile)}") } else { logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org