This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d19b173 [SPARK-31764][CORE] JsonProtocol doesn't write RDDInfo#isBarrier d19b173 is described below commit d19b173b47af04fe6f03e2b21b60eb317aeaae4f Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Wed May 27 14:36:12 2020 -0700 [SPARK-31764][CORE] JsonProtocol doesn't write RDDInfo#isBarrier ### What changes were proposed in this pull request? This PR changes JsonProtocol to write RDDInfos#isBarrier. ### Why are the changes needed? JsonProtocol reads RDDInfos#isBarrier but not write it so it's a bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a testcase. Closes #28583 from sarutak/SPARK-31764. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> --- .../scala/org/apache/spark/util/JsonProtocol.scala | 1 + .../scheduler/EventLoggingListenerSuite.scala | 44 ++++++++++++++++++++++ .../org/apache/spark/util/JsonProtocolSuite.scala | 11 ++++++ 3 files changed, 56 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 26bbff5..844d9b7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -487,6 +487,7 @@ private[spark] object JsonProtocol { ("Callsite" -> rddInfo.callSite) ~ ("Parent IDs" -> parentIds) ~ ("Storage Level" -> storageLevel) ~ + ("Barrier" -> rddInfo.isBarrier) ~ ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 61ea21f..7c23e44 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr import org.apache.spark.deploy.history.EventLogTestHelper._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED} import org.apache.spark.io._ import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem} import org.apache.spark.resource.ResourceProfile @@ -100,6 +101,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit testStageExecutorMetricsEventLogging() } + test("SPARK-31764: isBarrier should be logged in event log") { + val conf = new SparkConf() + conf.set(EVENT_LOG_ENABLED, true) + conf.set(EVENT_LOG_DIR, testDirPath.toString) + val sc = new SparkContext("local", "test-SPARK-31764", conf) + val appId = sc.applicationId + + sc.parallelize(1 to 10) + .barrier() + .mapPartitions(_.map(elem => (elem, elem))) + .filter(elem => elem._1 % 2 == 0) + .reduceByKey(_ + _) + .collect + sc.stop() + + val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem) + val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line))) + val jobStartEvents = events + .filter(event => event.isInstanceOf[SparkListenerJobStart]) + .map(_.asInstanceOf[SparkListenerJobStart]) + + assert(jobStartEvents.size === 1) + val stageInfos = jobStartEvents.head.stageInfos + assert(stageInfos.size === 2) + + val stage0 = stageInfos(0) + val rddInfosInStage0 = stage0.rddInfos + assert(rddInfosInStage0.size === 3) + val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name) + assert(sortedRddInfosInStage0(0).scope.get.name === "filter") + assert(sortedRddInfosInStage0(0).isBarrier === true) + assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions") + assert(sortedRddInfosInStage0(1).isBarrier === true) + assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize") + assert(sortedRddInfosInStage0(2).isBarrier === false) + + val stage1 = stageInfos(1) + val rddInfosInStage1 = stage1.rddInfos + assert(rddInfosInStage1.size === 1) + assert(rddInfosInStage1(0).scope.get.name === "reduceByKey") + assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey + } + /* ----------------- * * Actual test logic * * ----------------- */ diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index bc7f8b5..248142a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1100,6 +1100,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 201, | "Number of Cached Partitions": 301, | "Memory Size": 401, @@ -1623,6 +1624,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 200, | "Number of Cached Partitions": 300, | "Memory Size": 400, @@ -1668,6 +1670,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 400, | "Number of Cached Partitions": 600, | "Memory Size": 800, @@ -1684,6 +1687,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 401, | "Number of Cached Partitions": 601, | "Memory Size": 801, @@ -1729,6 +1733,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 600, | "Number of Cached Partitions": 900, | "Memory Size": 1200, @@ -1745,6 +1750,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 601, | "Number of Cached Partitions": 901, | "Memory Size": 1201, @@ -1761,6 +1767,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 602, | "Number of Cached Partitions": 902, | "Memory Size": 1202, @@ -1806,6 +1813,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 800, | "Number of Cached Partitions": 1200, | "Memory Size": 1600, @@ -1822,6 +1830,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 801, | "Number of Cached Partitions": 1201, | "Memory Size": 1601, @@ -1838,6 +1847,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 802, | "Number of Cached Partitions": 1202, | "Memory Size": 1602, @@ -1854,6 +1864,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Deserialized": true, | "Replication": 1 | }, + | "Barrier" : false, | "Number of Partitions": 803, | "Number of Cached Partitions": 1203, | "Memory Size": 1603, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org