This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3fd0cd609df6 [SPARK-47598][CORE] MLLib: Migrate logError with variables to structured logging framework 3fd0cd609df6 is described below commit 3fd0cd609df65051920c56861fa6da54caf4cc9e Author: panbingkun <panbing...@baidu.com> AuthorDate: Thu Apr 4 10:46:54 2024 -0700 [SPARK-47598][CORE] MLLib: Migrate logError with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logError` in module `MLLib` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45837 from panbingkun/SPARK-47598. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 7 +++++ .../scala/org/apache/spark/internal/Logging.scala | 2 +- .../apache/spark/ml/classification/LinearSVC.scala | 15 +++++----- .../ml/classification/LogisticRegression.scala | 14 ++++----- .../ml/regression/AFTSurvivalRegression.scala | 5 +--- .../spark/ml/regression/LinearRegression.scala | 5 +--- .../org/apache/spark/ml/util/Instrumentation.scala | 35 +++++++++++++++++++++- .../apache/spark/mllib/util/DataValidators.scala | 11 ++++--- .../org/apache/spark/mllib/util/MLUtils.scala | 10 ++++++- .../spark/ml/feature/VectorIndexerSuite.scala | 17 ++++++----- .../mllib/tree/GradientBoostedTreesSuite.scala | 20 +++++++++---- 11 files changed, 99 insertions(+), 42 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 608c0c6d521e..66f3b803c0d4 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -28,6 +28,7 @@ object LogKey extends Enumeration { val BLOCK_MANAGER_ID = Value val BROADCAST_ID = Value val BUCKET = Value + val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value val COMMAND = Value @@ -44,17 +45,22 @@ object LogKey extends Enumeration { val EXIT_CODE = Value val HOST = Value val JOB_ID = Value + val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value + val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value val MIN_SIZE = Value + val NUM_ITERATIONS = Value val OLD_BLOCK_MANAGER_ID = Value + val OPTIMIZER_CLASS_NAME = Value val PARTITION_ID = Value val PATH = Value val POD_ID = Value + val RANGE = Value val REASON = Value val REMOTE_ADDRESS = Value val RETRY_COUNT = Value @@ -63,6 +69,7 @@ object LogKey extends Enumeration { val SIZE = Value val STAGE_ID = Value val SUBMISSION_ID = Value + val SUBSAMPLING_RATE = Value val TASK_ATTEMPT_ID = Value val TASK_ID = Value val TASK_NAME = Value diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 84b9debb2afd..2132e166eacf 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -117,7 +117,7 @@ trait Logging { } } - private def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = { + protected def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = { val threadContext = CloseableThreadContext.putAll(context) try { body diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 13898a304b3d..024693ba06f2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{COUNT, RANGE} import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator._ @@ -36,6 +37,7 @@ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DatasetUtils._ import org.apache.spark.ml.util.Instrumentation.instrumented +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel @@ -220,10 +222,11 @@ class LinearSVC @Since("2.2.0") ( instr.logNumFeatures(numFeatures) if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." + val msg = log"Classification labels should be in " + + log"${MDC(RANGE, s"[0 to ${numClasses - 1}]")}. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } val featuresStd = summarizer.std.toArray @@ -249,9 +252,7 @@ class LinearSVC @Since("2.2.0") ( regularization, optimizer) if (rawCoefficients == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val coefficientArray = Array.tabulate(numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 8b796a65f4f8..0d487377b931 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{COUNT, RANGE} import org.apache.spark.ml.feature._ import org.apache.spark.ml.impl.Utils import org.apache.spark.ml.linalg._ @@ -530,10 +531,11 @@ class LogisticRegression @Since("1.2.0") ( } if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." + val msg = log"Classification labels should be in " + + log"${MDC(RANGE, s"[0 to ${numClasses - 1}]")}. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } instr.logNumClasses(numClasses) @@ -634,9 +636,7 @@ class LogisticRegression @Since("1.2.0") ( initialSolution.toArray, regularization, optimizer) if (allCoefficients == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index e9abcb095477..57d20bcd6f49 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -23,7 +23,6 @@ import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS} import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams @@ -271,9 +270,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S optimizer, initialSolution) if (rawCoefficients == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val coefficientArray = Array.tabulate(numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 9638eee8d590..d53b8b270f2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -25,7 +25,6 @@ import breeze.stats.distributions.Rand.FixedSeed.randBasis import breeze.stats.distributions.StudentsT import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.{PipelineStage, PredictorParams} @@ -428,9 +427,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String featuresMean, featuresStd, initialSolution, regularization, optimizer) if (parameters == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val model = createModel(parameters, yMean, yStd, featuresMean, featuresStd) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index a243ab8d27c9..bfc6465c58bd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -27,7 +27,7 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogEntry, Logging} import org.apache.spark.ml.{MLEvents, PipelineStage} import org.apache.spark.ml.param.{Param, Params} import org.apache.spark.rdd.RDD @@ -84,6 +84,17 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { super.logWarning(prefix + msg) } + /** + * Logs a LogEntry which message with a prefix that uniquely identifies the training session. + */ + override def logWarning(entry: LogEntry): Unit = { + if (log.isWarnEnabled) { + withLogContext(entry.context) { + log.warn(prefix + entry.message) + } + } + } + /** * Logs a error message with a prefix that uniquely identifies the training session. */ @@ -91,6 +102,17 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { super.logError(prefix + msg) } + /** + * Logs a LogEntry which message with a prefix that uniquely identifies the training session. + */ + override def logError(entry: LogEntry): Unit = { + if (log.isErrorEnabled) { + withLogContext(entry.context) { + log.error(prefix + entry.message) + } + } + } + /** * Logs an info message with a prefix that uniquely identifies the training session. */ @@ -98,6 +120,17 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { super.logInfo(prefix + msg) } + /** + * Logs a LogEntry which message with a prefix that uniquely identifies the training session. + */ + override def logInfo(entry: LogEntry): Unit = { + if (log.isInfoEnabled) { + withLogContext(entry.context) { + log.info(prefix + entry.message) + } + } + } + /** * Logs the value of the given parameters for the estimator being used in this session. */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala index 1a8c8807f91d..d8c0f8711cab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala @@ -18,7 +18,8 @@ package org.apache.spark.mllib.util import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{COUNT, RANGE} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD @@ -37,7 +38,8 @@ object DataValidators extends Logging { val binaryLabelValidator: RDD[LabeledPoint] => Boolean = { data => val numInvalid = data.filter(x => x.label != 1.0 && x.label != 0.0).count() if (numInvalid != 0) { - logError("Classification labels should be 0 or 1. Found " + numInvalid + " invalid labels") + logError(log"Classification labels should be 0 or 1. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels") } numInvalid == 0 } @@ -53,8 +55,9 @@ object DataValidators extends Logging { val numInvalid = data.filter(x => x.label - x.label.toInt != 0.0 || x.label < 0 || x.label > k - 1).count() if (numInvalid != 0) { - logError("Classification labels should be in {0 to " + (k - 1) + "}. " + - "Found " + numInvalid + " invalid labels") + logError(log"Classification labels should be in " + + log"${MDC(RANGE, s"[0 to ${k - 1}]")}. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels") } numInvalid == 0 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 378f1381e4cf..10adf10690b7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -22,8 +22,10 @@ import scala.reflect.ClassTag import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME import org.apache.spark.ml.linalg.{MatrixUDT => MLMatrixUDT, VectorUDT => MLVectorUDT} +import org.apache.spark.ml.util.Instrumentation import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.BLAS.dot import org.apache.spark.mllib.regression.LabeledPoint @@ -593,4 +595,10 @@ object MLUtils extends Logging { math.log1p(math.exp(x)) } } + + def optimizerFailed(instr: Instrumentation, optimizerClass: Class[_]): Unit = { + val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizerClass.getName)} failed." + instr.logError(msg) + throw new SparkException(msg.message) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index aa7b4e17a4df..0a9347b87977 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.ml.feature import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CATEGORICAL_FEATURES, MAX_CATEGORIES} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite @@ -175,8 +176,10 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { maxCategories: Int, categoricalFeatures: Set[Int]): Unit = { val collectedData = data.collect().map(_.getAs[Vector](0)) - val errMsg = s"checkCategoryMaps failed for input with maxCategories=$maxCategories," + - s" categoricalFeatures=${categoricalFeatures.mkString(", ")}" + + val errMsg = log"checkCategoryMaps failed for input with " + + log"maxCategories=${MDC(MAX_CATEGORIES, maxCategories)} " + + log"categoricalFeatures=${MDC(CATEGORICAL_FEATURES, categoricalFeatures.mkString(", "))}" try { val vectorIndexer = getIndexer.setMaxCategories(maxCategories) val model = vectorIndexer.fit(data) @@ -210,8 +213,8 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { assert(attr.values.get === origValueSet.toArray.sorted.map(_.toString)) assert(attr.isOrdinal.get === false) case _ => - throw new RuntimeException(errMsg + s". Categorical feature $feature failed" + - s" metadata check. Found feature attribute: $featureAttr.") + throw new RuntimeException(errMsg.message + s". Categorical feature $feature " + + s"failed metadata check. Found feature attribute: $featureAttr.") } } // Check numerical feature metadata. @@ -222,8 +225,8 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { case attr: NumericAttribute => assert(featureAttr.index.get === feature) case _ => - throw new RuntimeException(errMsg + s". Numerical feature $feature failed" + - s" metadata check. Found feature attribute: $featureAttr.") + throw new RuntimeException(errMsg.message + s". Numerical feature $feature " + + s"failed metadata check. Found feature attribute: $featureAttr.") } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index 5cf51c252cd5..f5c6abfc66f2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.mllib.tree import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.{MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{LEARNING_RATE, NUM_ITERATIONS, SUBSAMPLING_RATE} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.configuration.Algo._ @@ -33,6 +35,15 @@ import org.apache.spark.util.Utils */ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext { + private def buildErrorLog( + numIterations: Int, + learningRate: Double, + subsamplingRate: Double): MessageWithContext = { + log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + + log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + + log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}" + } + test("Regression with continuous features: SquaredError") { GradientBoostedTreesSuite.testCombinations.foreach { case (numIterations, learningRate, subsamplingRate) => @@ -51,8 +62,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.06) } catch { case e: java.lang.AssertionError => - logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") + logError(buildErrorLog(numIterations, learningRate, subsamplingRate)) throw e } @@ -82,8 +92,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.85, "mae") } catch { case e: java.lang.AssertionError => - logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") + logError(buildErrorLog(numIterations, learningRate, subsamplingRate)) throw e } @@ -114,8 +123,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.9) } catch { case e: java.lang.AssertionError => - logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") + logError(buildErrorLog(numIterations, learningRate, subsamplingRate)) throw e } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org