This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 520f3b1c192b [SPARK-47593][CORE] Connector module: Migrate logWarn with variables to structured logging framework 520f3b1c192b is described below commit 520f3b1c192b1bae53509fdad770f5711ca3791f Author: panbingkun <panbing...@baidu.com> AuthorDate: Tue Apr 9 21:42:39 2024 -0700 [SPARK-47593][CORE] Connector module: Migrate logWarn with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logWarning` in module `Connector` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45879 from panbingkun/SPARK-47593_warning. Lead-authored-by: panbingkun <panbing...@baidu.com> Co-authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 23 +++++++++++++ .../scala/org/apache/spark/util/MDCSuite.scala | 15 +++++++- .../org/apache/spark/sql/avro/AvroUtils.scala | 9 ++--- .../ExecutePlanResponseReattachableIterator.scala | 2 +- .../sql/connect/client/GrpcRetryHandler.scala | 18 +++++----- .../execution/ExecuteGrpcResponseSender.scala | 15 +++++--- .../service/SparkConnectStreamingQueryCache.scala | 14 +++++--- .../connect/ui/SparkConnectServerListener.scala | 36 +++++++++++++------ .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 7 ++-- .../spark/sql/kafka010/KafkaContinuousStream.scala | 5 +-- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 5 +-- .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 10 +++--- .../sql/kafka010/KafkaOffsetReaderConsumer.scala | 10 +++--- .../apache/spark/sql/kafka010/KafkaSource.scala | 5 +-- .../sql/kafka010/consumer/FetchedDataPool.scala | 7 ++-- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 40 +++++++++++++--------- .../producer/InternalKafkaProducerPool.scala | 6 ++-- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 9 ++--- .../kafka010/KafkaDelegationTokenProvider.scala | 10 +++--- .../streaming/kafka010/ConsumerStrategy.scala | 11 +++--- .../streaming/kafka010/KafkaDataConsumer.scala | 7 ++-- .../spark/streaming/kafka010/KafkaUtils.scala | 16 +++++---- .../streaming/kinesis/KinesisBackedBlockRDD.scala | 6 ++-- .../streaming/kinesis/KinesisRecordProcessor.scala | 2 +- .../spark/streaming/kinesis/KinesisTestUtils.scala | 7 ++-- 25 files changed, 195 insertions(+), 100 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 2cb5eac4548c..6cdec011e2ae 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -34,6 +34,7 @@ object LogKey extends Enumeration { val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value + val CLUSTER_ID = Value val COLUMN_DATA_TYPE_SOURCE = Value val COLUMN_DATA_TYPE_TARGET = Value val COLUMN_DEFAULT_VALUE = Value @@ -43,6 +44,7 @@ object LogKey extends Enumeration { val COMPONENT = Value val CONFIG = Value val CONFIG2 = Value + val CONTAINER = Value val CONTAINER_ID = Value val COUNT = Value val CSV_HEADER_COLUMN_NAME = Value @@ -51,6 +53,7 @@ object LogKey extends Enumeration { val CSV_SCHEMA_FIELD_NAME = Value val CSV_SCHEMA_FIELD_NAMES = Value val CSV_SOURCE = Value + val DATA = Value val DATABASE_NAME = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value @@ -70,9 +73,11 @@ object LogKey extends Enumeration { val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value + val INDEX = Value val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPRESSION = Value + val KEY = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value @@ -80,17 +85,23 @@ object LogKey extends Enumeration { val LOG_TYPE = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value + val MAX_CAPACITY = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value val MERGE_DIR_NAME = Value val METHOD_NAME = Value val MIN_SIZE = Value + val NEW_VALUE = Value val NUM_COLUMNS = Value val NUM_ITERATIONS = Value val OBJECT_ID = Value + val OFFSET = Value + val OFFSETS = Value val OLD_BLOCK_MANAGER_ID = Value + val OLD_VALUE = Value val OPTIMIZER_CLASS_NAME = Value + val OP_ID = Value val OP_TYPE = Value val PARSE_MODE = Value val PARTITION_ID = Value @@ -99,8 +110,11 @@ object LogKey extends Enumeration { val PATH = Value val PATHS = Value val POD_ID = Value + val POLICY = Value val PORT = Value + val PRODUCER_ID = Value val QUERY_HINT = Value + val QUERY_ID = Value val QUERY_PLAN = Value val QUERY_PLAN_LENGTH_ACTUAL = Value val QUERY_PLAN_LENGTH_MAX = Value @@ -117,6 +131,7 @@ object LogKey extends Enumeration { val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value + val SERVICE_NAME = Value val SESSION_ID = Value val SHARD_ID = Value val SHUFFLE_BLOCK_INFO = Value @@ -127,6 +142,8 @@ object LogKey extends Enumeration { val SQL_TEXT = Value val STAGE_ID = Value val STATEMENT_ID = Value + val STATUS = Value + val STREAM_NAME = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value val TABLE_NAME = Value @@ -140,13 +157,19 @@ object LogKey extends Enumeration { val TID = Value val TIMEOUT = Value val TIME_UNITS = Value + val TIP = Value + val TOPIC_PARTITION = Value val TOTAL_EFFECTIVE_TIME = Value val TOTAL_TIME = Value val UNSUPPORTED_EXPRESSION = Value val UNSUPPORTED_HINT_REASON = Value + val UNTIL_OFFSET = Value val URI = Value val USER_ID = Value val USER_NAME = Value + val WAIT_RESULT_TIME = Value + val WAIT_SEND_TIME = Value + val WAIT_TIME = Value val WATERMARK_CONSTRAINT = Value val WORKER_URL = Value val XSD_PATH = Value diff --git a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala index 1ac51e236080..62eaa852913c 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala @@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._ import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.EXIT_CODE +import org.apache.spark.internal.LogKey.{EXIT_CODE, OFFSET, RANGE} class MDCSuite extends AnyFunSuite // scalastyle:ignore funsuite @@ -41,6 +41,19 @@ class MDCSuite assert(log.context === Map("exit_code" -> "CustomObjectValue: spark, 10086").asJava) } + test("check MDC stripMargin") { + val log = + log""" + |The current available offset range is ${MDC(RANGE, "12 - 34")}. + | Offset ${MDC(OFFSET, "666")}. is out of range""".stripMargin + val expected = + s""" + |The current available offset range is 12 - 34. + | Offset 666. is out of range""".stripMargin + assert(log.message === expected) + assert(log.context === Map("range" -> "12 - 34", "offset" -> "666").asJava) + } + case class CustomObjectValue(key: String, value: Int) { override def toString: String = { "CustomObjectValue: " + key + ", " + value diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 269a8b80c2b7..e8be11f48a2b 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -31,7 +31,8 @@ import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkException, SparkIllegalArgumentException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, PATH} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroCompressionCodec._ import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION @@ -51,8 +52,8 @@ private[sql] object AvroUtils extends Logging { val parsedOptions = new AvroOptions(options, conf) if (parsedOptions.parameters.contains(IGNORE_EXTENSION)) { - logWarning(s"Option $IGNORE_EXTENSION is deprecated. Please use the " + - "general data source option pathGlobFilter for filtering file names.") + logWarning(log"Option ${MDC(CONFIG, IGNORE_EXTENSION)} is deprecated. Please use the " + + log"general data source option pathGlobFilter for filtering file names.") } // User can specify an optional avro json schema. val avroSchema = parsedOptions.schema @@ -160,7 +161,7 @@ private[sql] object AvroUtils extends Logging { } catch { case e: IOException => if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $path", e) + logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, path)}", e) None } else { throw new SparkException(s"Could not read file: $path", e) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 5854a9225dbe..74f13272a365 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -277,7 +277,7 @@ class ExecutePlanResponseReattachableIterator( } } catch { case NonFatal(e) => - logWarning(s"ReleaseExecute failed with exception: $e.") + logWarning(log"ReleaseExecute failed with exception:", e) } } } diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index 2418dfa03505..508dad3d748d 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -22,6 +22,8 @@ import scala.util.control.NonFatal import io.grpc.stub.StreamObserver import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKey.{ERROR, POLICY, RETRY_COUNT, WAIT_TIME} +import org.apache.spark.internal.MDC private[sql] class GrpcRetryHandler( private val policies: Seq[RetryPolicy], @@ -187,8 +189,8 @@ private[sql] object GrpcRetryHandler extends Logging { if (lastException.isInstanceOf[RetryException]) { // retry exception is considered immediately retriable without any policies. logWarning( - s"Non-Fatal error during RPC execution: $lastException, retrying " + - s"(currentRetryNum=$currentRetryNum)") + log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + + log"retrying (currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)})") return } @@ -197,18 +199,18 @@ private[sql] object GrpcRetryHandler extends Logging { if (time.isDefined) { logWarning( - s"Non-Fatal error during RPC execution: $lastException, retrying " + - s"(wait=${time.get.toMillis}, currentRetryNum=$currentRetryNum, " + - s"policy: ${policy.getName})") - + log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + + log"retrying (wait=${MDC(WAIT_TIME, time.get.toMillis)} ms, " + + log"currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)}, " + + log"policy=${MDC(POLICY, policy.getName)}).") sleep(time.get.toMillis) return } } logWarning( - s"Non-Fatal error during RPC execution: $lastException, exceeded retries " + - s"(currentRetryNum=$currentRetryNum)") + log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + + log"exceeded retries (currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)})") val error = new RetriesExceeded() exceptionList.foreach(error.addSuppressed) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 4b95f38c6695..1139507a37a5 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -24,7 +24,9 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto.ExecutePlanResponse -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME, WAIT_RESULT_TIME, WAIT_SEND_TIME} +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, CONNECT_PROGRESS_REPORT_INTERVAL} import org.apache.spark.sql.connect.service.{ExecuteHolder, SparkConnectService} @@ -268,10 +270,15 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // Process the outcome of the inner loop. if (interrupted) { // This sender got interrupted. Kill this RPC. + val totalTime = (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble + val waitResultTime = consumeSleep / NANOS_PER_MILLIS.toDouble + val waitSendTime = sendSleep / NANOS_PER_MILLIS.toDouble logWarning( - s"Got detached from opId=${executeHolder.operationId} at index ${nextIndex - 1}." + - s"totalTime=${System.nanoTime - startTime}ns " + - s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns") + log"Got detached from opId=${MDC(OP_ID, executeHolder.operationId)} " + + log"at index ${MDC(INDEX, nextIndex - 1)}." + + log"totalTime=${MDC(TOTAL_TIME, totalTime)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, waitResultTime)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, waitSendTime)} ms") throw new SparkSQLException(errorClass = "INVALID_CURSOR.DISCONNECTED", Map.empty) } else if (gotResponse) { enqueueProgressMessage() diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index a5d3fa497bb3..9690d10eba1a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -26,7 +26,8 @@ import scala.collection.mutable import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -65,9 +66,9 @@ private[connect] class SparkConnectStreamingQueryCache( queryCache.put(QueryCacheKey(query.id.toString, query.runId.toString), value) match { case Some(existing) => // Query is being replace. Not really expected. - logWarning( - s"Replacing existing query in the cache (unexpected). Query Id: ${query.id}." + - s"Existing value $existing, new value $value.") + logWarning(log"Replacing existing query in the cache (unexpected). " + + log"Query Id: ${MDC(QUERY_ID, query.id)}.Existing value ${MDC(OLD_VALUE, existing)}, " + + log"new value ${MDC(NEW_VALUE, value)}.") case None => logInfo(s"Adding new query to the cache. Query Id ${query.id}, value $value.") } @@ -115,7 +116,10 @@ private[connect] class SparkConnectStreamingQueryCache( v.query.stop() } catch { case NonFatal(ex) => - logWarning(s"Failed to stop the query ${k.queryId}. Error is ignored.", ex) + logWarning( + log"Failed to stop the query ${MDC(QUERY_ID, k.queryId)}. " + + log"Error is ignored.", + ex) } } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala index ec6079a89145..a1bbab7dbdbc 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala @@ -21,7 +21,8 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkConf, SparkContext, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{OP_ID, SESSION_ID} import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} @@ -183,8 +184,8 @@ private[connect] class SparkConnectServerListener( updateLiveStore(sessionData) case None => logWarning( - s"onOperationStart called with unknown session id: ${e.sessionId}." + - s"Regardless, the operation has been registered.") + log"onOperationStart called with unknown session id: ${MDC(SESSION_ID, e.sessionId)}." + + log"Regardless, the operation has been registered.") } } @@ -194,7 +195,9 @@ private[connect] class SparkConnectServerListener( executionData.state = ExecutionState.COMPILED updateLiveStore(executionData) case None => - logWarning(s"onOperationAnalyzed called with unknown operation id: ${e.jobTag}") + logWarning( + log"onOperationAnalyzed called with " + + log"unknown operation id: ${MDC(OP_ID, e.jobTag)}") } } @@ -205,7 +208,9 @@ private[connect] class SparkConnectServerListener( executionData.state = ExecutionState.READY updateLiveStore(executionData) case None => - logWarning(s"onOperationReadyForExecution called with unknown operation id: ${e.jobTag}") + logWarning( + log"onOperationReadyForExecution called with " + + log"unknown operation id: ${MDC(OP_ID, e.jobTag)}") } } @@ -216,7 +221,9 @@ private[connect] class SparkConnectServerListener( executionData.state = ExecutionState.CANCELED updateLiveStore(executionData) case None => - logWarning(s"onOperationCanceled called with unknown operation id: ${e.jobTag}") + logWarning( + log"onOperationCanceled called with " + + log"unknown operation id: ${MDC(OP_ID, e.jobTag)}") } } private def onOperationFailed(e: SparkListenerConnectOperationFailed) = synchronized { @@ -227,7 +234,9 @@ private[connect] class SparkConnectServerListener( executionData.state = ExecutionState.FAILED updateLiveStore(executionData) case None => - logWarning(s"onOperationFailed called with unknown operation id: ${e.jobTag}") + logWarning( + log"onOperationFailed called with " + + log"unknown operation id: ${MDC(OP_ID, e.jobTag)}") } } private def onOperationFinished(e: SparkListenerConnectOperationFinished) = synchronized { @@ -237,7 +246,9 @@ private[connect] class SparkConnectServerListener( executionData.state = ExecutionState.FINISHED updateLiveStore(executionData) case None => - logWarning(s"onOperationFinished called with unknown operation id: ${e.jobTag}") + logWarning( + log"onOperationFinished called with " + + log"unknown operation id: ${MDC(OP_ID, e.jobTag)}") } } private def onOperationClosed(e: SparkListenerConnectOperationClosed) = synchronized { @@ -248,7 +259,9 @@ private[connect] class SparkConnectServerListener( updateStoreWithTriggerEnabled(executionData) executionList.remove(e.jobTag) case None => - logWarning(s"onOperationClosed called with unknown operation id: ${e.jobTag}") + logWarning( + log"onOperationClosed called with " + + log"unknown operation id: ${MDC(OP_ID, e.jobTag)}") } } @@ -265,7 +278,10 @@ private[connect] class SparkConnectServerListener( updateStoreWithTriggerEnabled(sessionData) sessionList.remove(e.sessionId) - case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + case None => + logWarning( + log"onSessionClosed called with " + + log"unknown session id: ${MDC(SESSION_ID, e.sessionId)}") } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index 8e65e8af9d7a..fc095c5f5b31 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -36,7 +36,7 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.SpanSugar._ -import org.apache.spark.internal.LogKey.CLASS_NAME +import org.apache.spark.internal.LogKey.{CLASS_NAME, CONTAINER, STATUS} import org.apache.spark.internal.MDC import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession @@ -263,9 +263,10 @@ abstract class DockerJDBCIntegrationSuite } catch { case NonFatal(e) => val response = docker.inspectContainerCmd(container.getId).exec() - logWarning(s"Container $container already stopped") + logWarning(log"Container ${MDC(CONTAINER, container)} already stopped") val status = Option(response).map(_.getState.getStatus).getOrElse("unknown") - logWarning(s"Could not stop container $container at stage '$status'", e) + logWarning(log"Could not stop container ${MDC(CONTAINER, container)} " + + log"at stage '${MDC(STATUS, status)}'", e) } finally { logContainerOutput() docker.removeContainerCmd(container.getId).exec() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 9b7f52585545..9bf0a2e9e513 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -24,7 +24,8 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, Offset import org.apache.kafka.common.TopicPartition import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, TIP} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.InputPartition @@ -148,7 +149,7 @@ class KafkaContinuousStream( if (failOnDataLoss) { throw getException() } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}") } } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index fefa3efcc353..be838ddc3c80 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, TIP} import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} @@ -309,7 +310,7 @@ private[kafka010] class KafkaMicroBatchStream( if (failOnDataLoss) { throw getException() } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}") } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 27adccf6f902..433da08176e7 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -30,7 +30,8 @@ import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -335,8 +336,8 @@ private[kafka010] class KafkaOffsetReaderAdmin( incorrectOffsets = findIncorrectOffsets() if (incorrectOffsets.nonEmpty) { - logWarning("Found incorrect offsets in some partitions " + - s"(partition, previous offset, fetched offset): $incorrectOffsets") + logWarning(log"Found incorrect offsets in some partitions " + + log"(partition, previous offset, fetched offset): ${MDC(OFFSETS, incorrectOffsets)}") if (attempt < maxOffsetFetchAttempts) { logWarning("Retrying to fetch latest offsets because of incorrect offsets") Thread.sleep(offsetFetchAttemptIntervalMs) @@ -534,7 +535,8 @@ private[kafka010] class KafkaOffsetReaderAdmin( } catch { case NonFatal(e) => lastException = e - logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) + logWarning( + log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka offsets: ", e) attempt += 1 Thread.sleep(offsetFetchAttemptIntervalMs) resetAdmin() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index d4953a4a65e3..2ba4a9a563df 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, OffsetAndTim import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -385,8 +386,8 @@ private[kafka010] class KafkaOffsetReaderConsumer( incorrectOffsets = findIncorrectOffsets() if (incorrectOffsets.nonEmpty) { - logWarning("Found incorrect offsets in some partitions " + - s"(partition, previous offset, fetched offset): $incorrectOffsets") + logWarning(log"Found incorrect offsets in some partitions " + + log"(partition, previous offset, fetched offset): ${MDC(OFFSETS, incorrectOffsets)}") if (attempt < maxOffsetFetchAttempts) { logWarning("Retrying to fetch latest offsets because of incorrect offsets") Thread.sleep(offsetFetchAttemptIntervalMs) @@ -611,7 +612,8 @@ private[kafka010] class KafkaOffsetReaderConsumer( } catch { case NonFatal(e) => lastException = e - logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) + logWarning( + log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka offsets: ", e) attempt += 1 Thread.sleep(offsetFetchAttemptIntervalMs) resetConsumer() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 83ed7fff23fc..426672d2e458 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -22,7 +22,8 @@ import java.{util => ju} import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, TIP} import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ @@ -351,7 +352,7 @@ private[kafka010] class KafkaSource( if (failOnDataLoss) { throw getException() } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}") } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala index 3e6831770a67..981aa71bf947 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala @@ -26,7 +26,8 @@ import scala.collection.mutable import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DATA, KEY} import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -110,8 +111,8 @@ private[consumer] class FetchedDataPool( def release(key: CacheKey, fetchedData: FetchedData): Unit = synchronized { def warnReleasedDataNotInPool(key: CacheKey, fetchedData: FetchedData): Unit = { - logWarning(s"No matching data in pool for $fetchedData in key $key. " + - "It might be released before, or it was not a part of pool.") + logWarning(log"No matching data in pool for ${MDC(DATA, fetchedData)} in key " + + log"${MDC(KEY, key)}. It might be released before, or it was not a part of pool.") } cache.get(key) match { diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index fbc4a500322e..3ea7d967744c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -30,7 +30,8 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{ERROR, GROUP_ID, OFFSET, RANGE, TIP, TOPIC_PARTITION, UNTIL_OFFSET} import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil} import org.apache.spark.sql.kafka010.KafkaExceptions import org.apache.spark.sql.kafka010.KafkaSourceProvider._ @@ -426,7 +427,8 @@ private[kafka010] class KafkaDataConsumer( val range = timeNanos { consumer.getAvailableOffsetRange() } - logWarning(s"Some data may be lost. Recovering from the earliest offset: ${range.earliest}") + logWarning(log"Some data may be lost. Recovering from the earliest offset: " + + log"${MDC(OFFSET, range.earliest)}") val topicPartition = consumer.topicPartition val groupId = consumer.groupId @@ -444,11 +446,12 @@ private[kafka010] class KafkaDataConsumer( // | | | | // offset untilOffset earliestOffset latestOffset val warningMessage = - s""" - |The current available offset range is $range. - | Offset $offset is out of range, and records in [$offset, $untilOffset) will be - | skipped ${additionalWarningMessage(topicPartition, groupId)} - """.stripMargin + log""" + |The current available offset range is ${MDC(RANGE, range)}. + | Offset ${MDC(OFFSET, offset)} is out of range, and records in + | [${MDC(OFFSET, offset)}, ${MDC(UNTIL_OFFSET, untilOffset)}] will be + | skipped""".stripMargin + + additionalWarningMessage(topicPartition, groupId) logWarning(warningMessage) UNKNOWN_OFFSET } else if (offset >= range.earliest) { @@ -460,8 +463,8 @@ private[kafka010] class KafkaDataConsumer( // This will happen when a topic is deleted and recreated, and new data are pushed very fast, // then we will see `offset` disappears first then appears again. Although the parameters // are same, the state in Kafka cluster is changed, so the outer loop won't be endless. - logWarning(s"Found a disappeared offset $offset. Some data may be lost " + - s"${additionalWarningMessage(topicPartition, groupId)}") + logWarning(log"Found a disappeared offset ${MDC(OFFSET, offset)}. Some data may be lost " + + additionalWarningMessage(topicPartition, groupId)) offset } else { // ------------------------------------------------------------------------------ @@ -470,10 +473,11 @@ private[kafka010] class KafkaDataConsumer( // offset earliestOffset min(untilOffset,latestOffset) max(untilOffset, latestOffset) val warningMessage = s""" - |The current available offset range is $range. - | Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be - | skipped ${additionalWarningMessage(topicPartition, groupId)} - """.stripMargin + |The current available offset range is ${MDC(RANGE, range)}. + | Offset ${MDC(OFFSET, offset)} is out of range, and records in + | [${MDC(OFFSET, offset)}, ${MDC(UNTIL_OFFSET, range.earliest)}] will be + | skipped""".stripMargin + + additionalWarningMessage(topicPartition, groupId) logWarning(warningMessage) range.earliest } @@ -629,9 +633,10 @@ private[kafka010] class KafkaDataConsumer( */ private def additionalWarningMessage( topicPartition: TopicPartition, - groupId: String): String = { - s"(GroupId: $groupId, TopicPartition: $topicPartition). " + - s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE" + groupId: String): MessageWithContext = { + log"(GroupId: ${MDC(GROUP_ID, groupId)}, " + + log"TopicPartition: ${MDC(TOPIC_PARTITION, topicPartition)}). " + + log"${MDC(TIP, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}" } /** @@ -660,7 +665,8 @@ private[kafka010] class KafkaDataConsumer( groupId: String, message: String, cause: Throwable = null): Unit = { - val finalMessage = s"$message ${additionalWarningMessage(topicPartition, groupId)}" + val finalMessage = log"${MDC(ERROR, message)}" + + additionalWarningMessage(topicPartition, groupId) dataLoss += 1 if (cause != null) { diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala index ddde7805f1a4..f35023d744b6 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala @@ -27,7 +27,8 @@ import scala.jdk.CollectionConverters._ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PRODUCER_ID import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil} import org.apache.spark.sql.kafka010.{PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, PRODUCER_CACHE_TIMEOUT} import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, ThreadUtils, Utils} @@ -96,7 +97,8 @@ private[producer] class InternalKafkaProducerPool( case Some(entry) if entry.producer.id == producer.id => entry.handleReturned(clock.nanoTime()) case _ => - logWarning(s"Released producer ${producer.id} is not a member of the cache. Closing.") + logWarning(log"Released producer ${MDC(PRODUCER_ID, producer.id)} is not " + + log"a member of the cache. Closing.") producer.close() } } diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 1c397a8d5005..8ec8f2556b9b 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -51,7 +51,8 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.kafka010.KafkaTokenUtil import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -332,7 +333,7 @@ class KafkaTestUtils( Utils.deleteRecursively(new File(f)) } catch { case e: IOException if Utils.isWindows => - logWarning(e.getMessage) + logWarning(log"${MDC(ERROR, e.getMessage)}") } } @@ -653,13 +654,13 @@ class KafkaTestUtils( Utils.deleteRecursively(snapshotDir) } catch { case e: IOException if Utils.isWindows => - logWarning(e.getMessage) + logWarning(log"${MDC(ERROR, e.getMessage)}") } try { Utils.deleteRecursively(logDir) } catch { case e: IOException if Utils.isWindows => - logWarning(e.getMessage) + logWarning(log"${MDC(ERROR, e.getMessage)}") } System.clearProperty(ZOOKEEPER_AUTH_PROVIDER) } diff --git a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala index d1d2a031e662..d0bcf90babc1 100644 --- a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala +++ b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala @@ -24,7 +24,8 @@ import org.apache.hadoop.security.Credentials import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLUSTER_ID, SERVICE_NAME} import org.apache.spark.security.HadoopDelegationTokenProvider private[spark] class KafkaDelegationTokenProvider @@ -54,9 +55,10 @@ private[spark] class KafkaDelegationTokenProvider } } catch { case NonFatal(e) => - logWarning(s"Failed to get token from service: $serviceName due to $e on " + - s"cluster: ${clusterConf.identifier}. If $serviceName is not used, " + - s"set spark.security.credentials.$serviceName.enabled to false") + logWarning(log"Failed to get token from service: ${MDC(SERVICE_NAME, serviceName)} " + + log"on cluster: ${MDC(CLUSTER_ID, clusterConf.identifier)}. If " + + log"${MDC(SERVICE_NAME, serviceName)} is not used, please set " + + log"spark.security.credentials.${MDC(SERVICE_NAME, serviceName)}.enabled to false", e) } } } catch { diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 693ddd31d9a8..2bc2acf9aaf9 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CONFIG import org.apache.spark.kafka010.KafkaConfigUpdater /** @@ -107,8 +108,8 @@ private case class Subscribe[K, V]( consumer.poll(0) } catch { case x: NoOffsetForPartitionException if shouldSuppress => - logWarning("Catching NoOffsetForPartitionException since " + - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") + logWarning(log"Catching NoOffsetForPartitionException since " + + log"${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} is none. See KAFKA-3370") } toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) @@ -161,8 +162,8 @@ private case class SubscribePattern[K, V]( consumer.poll(0) } catch { case x: NoOffsetForPartitionException if shouldSuppress => - logWarning("Catching NoOffsetForPartitionException since " + - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") + logWarning(log"Catching NoOffsetForPartitionException since " + + log"${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} is none. See KAFKA-3370") } toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index c7ac6a8bf744..6b47e9d72f4b 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -26,7 +26,8 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaC import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{KEY, MAX_CAPACITY} import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] sealed trait KafkaDataConsumer[K, V] { @@ -256,8 +257,8 @@ private[kafka010] object KafkaDataConsumer extends Logging { if (entry.getValue.inUse == false && this.size > maxCapacity) { logWarning( - s"KafkaConsumer cache hitting max capacity of $maxCapacity, " + - s"removing consumer for ${entry.getKey}") + log"KafkaConsumer cache hitting max capacity of ${MDC(MAX_CAPACITY, maxCapacity)}, " + + log"removing consumer for ${MDC(KEY, entry.getKey)}") try { entry.getValue.close() } catch { diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index 731d06fd95fa..f3e4c45b3aa9 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext } import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.GROUP_ID +import org.apache.spark.internal.LogKey.{CONFIG, GROUP_ID} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext } @@ -184,26 +184,30 @@ object KafkaUtils extends Logging { * Tweak kafka params to prevent issues on executors */ private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = { - logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor") + logWarning(log"overriding ${MDC(CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)} " + + log"to false for executor") kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean) - logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor") + logWarning(log"overriding ${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} " + + log"to none for executor") kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") // driver and executor should be in different consumer groups val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) if (null == originalGroupId) { - logError(log"${MDC(GROUP_ID, ConsumerConfig.GROUP_ID_CONFIG)} is null, " + + logError(log"${MDC(CONFIG, ConsumerConfig.GROUP_ID_CONFIG)} is null, " + log"you should probably set it") } val groupId = "spark-executor-" + originalGroupId - logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") + logWarning(log"overriding executor ${MDC(CONFIG, ConsumerConfig.GROUP_ID_CONFIG)} " + + log"to ${MDC(GROUP_ID, groupId)}") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) // possible workaround for KAFKA-3135 val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG) if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) { - logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135") + logWarning(log"overriding ${MDC(CONFIG, ConsumerConfig.RECEIVE_BUFFER_CONFIG)} " + + log"to 65536 see KAFKA-3135") kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) } } diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index d22496a84b58..6b0c091534b7 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -29,7 +29,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, RETRY_COUNT} import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition} import org.apache.spark.storage.BlockId import org.apache.spark.util.NextIterator @@ -277,7 +278,8 @@ class KinesisSequenceRangeIterator( lastError = t t match { case ptee: ProvisionedThroughputExceededException => - logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee) + logWarning(log"Error while ${MDC(ERROR, message)} " + + log"[attempt = ${MDC(RETRY_COUNT, retryCount + 1)}]", ptee) case e: Throwable => throw new SparkException(s"Error while $message", e) } diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index e1e21954ce88..94e109680fbc 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -120,7 +120,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") // null if not initialized before shutdown: if (shardId == null) { - logWarning(s"No shardId for workerId $workerId?") + logWarning(log"No shardId for workerId ${MDC(WORKER_URL, workerId)}?") } else { reason match { /* diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 406c19be9bff..cd4c61396a12 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -33,7 +33,8 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient} import com.amazonaws.services.kinesis.model._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{STREAM_NAME, TABLE_NAME} /** * Shared utility methods for performing Kinesis tests that actually transfer data. @@ -147,7 +148,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi } } catch { case e: Exception => - logWarning(s"Could not delete stream $streamName") + logWarning(log"Could not delete stream ${MDC(STREAM_NAME, streamName)}", e) } } @@ -158,7 +159,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi table.waitForDelete() } catch { case e: Exception => - logWarning(s"Could not delete DynamoDB table $tableName") + logWarning(log"Could not delete DynamoDB table ${MDC(TABLE_NAME, tableName)}", e) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org