This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 56ddf50 [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs 56ddf50 is described below commit 56ddf50e20bd38f37d6d037b97c1b1d59100116b Author: Cheng Pan <cheng...@apache.org> AuthorDate: Sun Mar 6 15:41:20 2022 -0800 [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs ### What changes were proposed in this pull request? Use UTF-8 instead of system default encoding to read event log ### Why are the changes needed? After SPARK-29160, we should always use UTF-8 to read event log, otherwise, if Spark History Server run with different default charset than "UTF-8", will encounter such error. ``` 2022-03-04 12:16:00,143 [3752440] - INFO [log-replay-executor-19:Logging57] - Parsing hdfs://hz-cluster11/spark2-history/application_1640597251469_2453817_1.lz4 for listing data... 2022-03-04 12:16:00,145 [3752442] - ERROR [log-replay-executor-18:Logging94] - Exception while merging application listings java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:281) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74) at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:884) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4(FsHistoryProvider.scala:819) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4$adapted(FsHistoryProvider.scala:801) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2626) at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:801) at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:715) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:581) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? Yes, bug fix. ### How was this patch tested? Verification steps in ubuntu:20.04 1. build `spark-3.3.0-SNAPSHOT-bin-master.tgz` on commit `34618a7ef6` using `dev/make-distribution.sh --tgz --name master` 2. build `spark-3.3.0-SNAPSHOT-bin-SPARK-38411.tgz` on commit `2a8f56038b` using `dev/make-distribution.sh --tgz --name SPARK-38411` 3. switch to UTF-8 using `export LC_ALL=C.UTF-8 && bash` 4. generate event log contains no-ASCII chars. ``` bin/spark-submit \ --master local[*] \ --class org.apache.spark.examples.SparkPi \ --conf spark.eventLog.enabled=true \ --conf spark.user.key='计算圆周率' \ examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar ``` 5. switch to POSIX using `export LC_ALL=POSIX && bash` 6. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/start-history-server.sh` and watch logs <details> ``` Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /spark-3.3.0-SNAPSHOT-bin-master/conf/:/spark-3.3.0-SNAPSHOT-bin-master/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer ======================================== Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 22/03/06 13:37:19 INFO HistoryServer: Started daemon with process name: 48729c3ffc10aa9 22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for TERM 22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for HUP 22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for INT 22/03/06 13:37:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/03/06 13:37:21 INFO SecurityManager: Changing view acls to: root 22/03/06 13:37:21 INFO SecurityManager: Changing modify acls to: root 22/03/06 13:37:21 INFO SecurityManager: Changing view acls groups to: 22/03/06 13:37:21 INFO SecurityManager: Changing modify acls groups to: 22/03/06 13:37:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 22/03/06 13:37:21 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions: 22/03/06 13:37:22 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080. 22/03/06 13:37:23 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://29c3ffc10aa9:18080 22/03/06 13:37:23 INFO FsHistoryProvider: Parsing file:/tmp/spark-events/local-1646573251839 for listing data... 22/03/06 13:37:25 ERROR FsHistoryProvider: Exception while merging application listings java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:281) ~[?:1.8.0_312] at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) ~[?:1.8.0_312] at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:1.8.0_312] at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[?:1.8.0_312] at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:1.8.0_312] at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[?:1.8.0_312] at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[?:1.8.0_312] at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74) ~[scala-library-2.12.15.jar:?] at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:886) ~[scala-library-2.12.15.jar:?] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) ~[scala-library-2.12.15.jar:?] at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4(FsHistoryProvider.scala:830) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4$adapted(FsHistoryProvider.scala:812) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2738) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListingInternal(FsHistoryProvider.scala:812) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:758) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:718) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:584) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_312] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_312] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312] ``` </details> 7. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/stop-history-server.sh` 8. run `spark-3.3.0-SNAPSHOT-bin-SPARK-38411/sbin/stop-history-server.sh` and watch logs <details> ``` Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /spark-3.3.0-SNAPSHOT-bin-SPARK-38411/conf/:/spark-3.3.0-SNAPSHOT-bin-SPARK-38411/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer ======================================== Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 22/03/06 13:30:54 INFO HistoryServer: Started daemon with process name: 34729c3ffc10aa9 22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for TERM 22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for HUP 22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for INT 22/03/06 13:30:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/03/06 13:30:56 INFO SecurityManager: Changing view acls to: root 22/03/06 13:30:56 INFO SecurityManager: Changing modify acls to: root 22/03/06 13:30:56 INFO SecurityManager: Changing view acls groups to: 22/03/06 13:30:56 INFO SecurityManager: Changing modify acls groups to: 22/03/06 13:30:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 22/03/06 13:30:56 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions: 22/03/06 13:30:57 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080. 22/03/06 13:30:57 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://29c3ffc10aa9:18080 22/03/06 13:30:57 INFO FsHistoryProvider: Parsing file:/tmp/spark-events/local-1646573251839 for listing data... 22/03/06 13:30:59 INFO FsHistoryProvider: Finished parsing file:/tmp/spark-events/local-1646573251839 ``` </details> Closes #35730 from pan3793/SPARK-38411. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 135841f257fbb008aef211a5e38222940849cb26) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++-- .../org/apache/spark/deploy/history/EventLogFileWritersSuite.scala | 4 ++-- .../scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c67122c..6e2471d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -26,7 +26,7 @@ import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.io.Source +import scala.io.{Codec, Source} import scala.util.control.NonFatal import scala.xml.Node @@ -803,7 +803,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - val source = Source.fromInputStream(in).getLines() + val source = Source.fromInputStream(in)(Codec.UTF8).getLines() // Because skipping may leave the stream in the middle of a line, read the next line // before replaying. diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index c4e3d6ae..e4e8b72 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import scala.collection.mutable.ArrayBuffer -import scala.io.Source +import scala.io.{Codec, Source} import com.google.common.io.ByteStreams import org.apache.commons.io.FileUtils @@ -647,7 +647,7 @@ class SparkSubmitSuite runSparkSubmit(args) val listStatus = fileSystem.listStatus(testDirPath) val logData = EventLogFileReader.openEventLog(listStatus.last.getPath, fileSystem) - Source.fromInputStream(logData).getLines().foreach { line => + Source.fromInputStream(logData)(Codec.UTF8).getLines().foreach { line => assert(!line.contains("secret_password")) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index e6dd9ae..455e2e1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -21,7 +21,7 @@ import java.io.{File, FileOutputStream, IOException} import java.net.URI import scala.collection.mutable -import scala.io.Source +import scala.io.{Codec, Source} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -114,7 +114,7 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = { val logDataStream = EventLogFileReader.openEventLog(log, fs) try { - Source.fromInputStream(logDataStream).getLines().toList + Source.fromInputStream(logDataStream)(Codec.UTF8).getLines().toList } finally { logDataStream.close() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 09ad223..509af4b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -23,7 +23,7 @@ import java.util.{Arrays, Properties} import scala.collection.immutable.Map import scala.collection.mutable import scala.collection.mutable.Set -import scala.io.Source +import scala.io.{Codec, Source} import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ @@ -661,7 +661,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } private def readLines(in: InputStream): Seq[String] = { - Source.fromInputStream(in).getLines().toSeq + Source.fromInputStream(in)(Codec.UTF8).getLines().toSeq } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org