svn commit: r28552 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_20_02-4c27663-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Aug 4 03:16:11 2018 New Revision: 28552 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_20_02-4c27663 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
Repository: spark Updated Branches: refs/heads/master 8c14276c3 -> 4c27663cb [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? Increase ZK timeout and harmonize configs across Kafka tests to resolâ¦ve potentially flaky test failure ## How was this patch tested? Existing tests Author: Sean Owen Closes #21995 from srowen/SPARK-18057.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c27663c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c27663c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c27663c Branch: refs/heads/master Commit: 4c27663cb20f3cde7317ffcb2c9d42257a40057f Parents: 8c14276 Author: Sean Owen Authored: Fri Aug 3 16:22:54 2018 -0700 Committer: Shixiong Zhu Committed: Fri Aug 3 16:22:54 2018 -0700 -- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 1 + .../org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 6 +- 2 files changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c27663c/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 8229490..d89cccd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -304,6 +304,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("port", brokerPort.toString) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) +props.put("zookeeper.connection.timeout.ms", "6") props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") http://git-wip-us.apache.org/repos/asf/spark/blob/4c27663c/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 2315baf..eef4c55 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -32,6 +32,7 @@ import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} @@ -109,7 +110,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort(brokerConf.interBrokerListenerName) + brokerPort = server.boundPort(new ListenerName("PLAINTEXT")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") @@ -220,8 +221,11 @@ private[kafka010] class KafkaTestUtils extends Logging { props.put("port", brokerPort.toString) props.put("log.dir", brokerLogDir) props.put("zookeeper.connect", zkAddress) +props.put("zookeeper.connection.timeout.ms", "6") props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") +props.put("delete.topic.enable", "true") +props.put("offsets.topic.num.partitions", "1") props.put("offsets.topic.replication.factor", "1") props.put("group.initial.rebalance.delay.ms", "10") props - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28548 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_16_02-8c14276-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 23:15:56 2018 New Revision: 28548 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_16_02-8c14276 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Little typo
Repository: spark Updated Branches: refs/heads/master 92b48842b -> 8c14276c3 Little typo ## What changes were proposed in this pull request? Fixed little typo for a comment ## How was this patch tested? Manual test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Onwuka Gideon Closes #21992 from dongido001/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c14276c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c14276c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c14276c Branch: refs/heads/master Commit: 8c14276c3362798b030db7a9fcdc31a10d04b643 Parents: 92b4884 Author: Onwuka Gideon Authored: Fri Aug 3 17:39:40 2018 -0500 Committer: Sean Owen Committed: Fri Aug 3 17:39:40 2018 -0500 -- python/pyspark/streaming/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c14276c/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index a451582..3fa57ca 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -222,7 +222,7 @@ class StreamingContext(object): Set each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and releases them for garbage collection. -This method allows the developer to specify how to long to remember +This method allows the developer to specify how long to remember the RDDs (if the developer wishes to query old data outside the DStream computation). @@ -287,7 +287,7 @@ class StreamingContext(object): def queueStream(self, rdds, oneAtATime=True, default=None): """ -Create an input stream from an queue of RDDs or list. In each batch, +Create an input stream from a queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue. .. note:: Changes to the queue after the stream is created will not be recognized. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [PYSPARK] Updates to Accumulators
Repository: spark Updated Branches: refs/heads/branch-2.1 a3eb07db3 -> b2e0f68f6 [PYSPARK] Updates to Accumulators (cherry picked from commit 15fc2372269159ea2556b028d4eb8860c4108650) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e0f68f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e0f68f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e0f68f Branch: refs/heads/branch-2.1 Commit: b2e0f68f615cbe2cf74f9813ece76c311fe8e911 Parents: a3eb07d Author: LucaCanali Authored: Wed Jul 18 23:19:02 2018 +0200 Committer: Imran Rashid Committed: Fri Aug 3 16:30:40 2018 -0500 -- .../org/apache/spark/api/python/PythonRDD.scala | 12 +++-- python/pyspark/accumulators.py | 53 +++- python/pyspark/context.py | 5 +- 3 files changed, 53 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2e0f68f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b1190b9..de548e8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -886,8 +886,9 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By */ private[spark] class PythonAccumulatorV2( @transient private val serverHost: String, -private val serverPort: Int) - extends CollectionAccumulator[Array[Byte]] { +private val serverPort: Int, +private val secretToken: String) + extends CollectionAccumulator[Array[Byte]] with Logging{ Utils.checkHost(serverHost, "Expected hostname") @@ -902,12 +903,17 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) + logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + // send the secret just for the initial authentication when opening a new connection + socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } socket } // Need to override so the types match with PythonFunction - override def copyAndReset(): PythonAccumulatorV2 = new PythonAccumulatorV2(serverHost, serverPort) + override def copyAndReset(): PythonAccumulatorV2 = { +new PythonAccumulatorV2(serverHost, serverPort, secretToken) + } override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): Unit = synchronized { val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2] http://git-wip-us.apache.org/repos/asf/spark/blob/b2e0f68f/python/pyspark/accumulators.py -- diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 6ef8cf5..bc0be07 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -228,20 +228,49 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry -while not self.server.server_shutdown: -# Poll every 1 second for new data -- don't block in case of shutdown. -r, _, _ = select.select([self.rfile], [], [], 1) -if self.rfile in r: -num_updates = read_int(self.rfile) -for _ in range(num_updates): -(aid, update) = pickleSer._read_with_length(self.rfile) -_accumulatorRegistry[aid] += update -# Write a byte in acknowledgement -self.wfile.write(struct.pack("!b", 1)) +auth_token = self.server.auth_token + +def poll(func): +while not self.server.server_shutdown: +# Poll every 1 second for new data -- don't block in case of shutdown. +r, _, _ = select.select([self.rfile], [], [], 1) +if self.rfile in r: +if func(): +break + +def accum_updates(): +num_updates = read_int(self.rfile) +for _ in range(num_updates): +(aid, update) = pickleSer._read_with_length(self.rfile) +_accumulatorRegistry[aid] += update +# Write a byte in acknowledgement +self.wfile.write(struct.pack("!b", 1)) +return False + +def authenticate_and_accum_updates(): +received_token = self.rfile.read(len(auth_token)) +if
spark git commit: [PYSPARK] Updates to Accumulators
Repository: spark Updated Branches: refs/heads/branch-2.2 22ce8051f -> a5624c7ae [PYSPARK] Updates to Accumulators (cherry picked from commit 15fc2372269159ea2556b028d4eb8860c4108650) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5624c7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5624c7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5624c7a Branch: refs/heads/branch-2.2 Commit: a5624c7ae29d6d49117dd78642879bf978212d30 Parents: 22ce805 Author: LucaCanali Authored: Wed Jul 18 23:19:02 2018 +0200 Committer: Imran Rashid Committed: Fri Aug 3 14:49:38 2018 -0500 -- .../org/apache/spark/api/python/PythonRDD.scala | 12 +++-- python/pyspark/accumulators.py | 53 +++- python/pyspark/context.py | 5 +- 3 files changed, 53 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5624c7a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0662792..7b5a179 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -900,8 +900,9 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By */ private[spark] class PythonAccumulatorV2( @transient private val serverHost: String, -private val serverPort: Int) - extends CollectionAccumulator[Array[Byte]] { +private val serverPort: Int, +private val secretToken: String) + extends CollectionAccumulator[Array[Byte]] with Logging{ Utils.checkHost(serverHost, "Expected hostname") @@ -916,12 +917,17 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) + logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + // send the secret just for the initial authentication when opening a new connection + socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } socket } // Need to override so the types match with PythonFunction - override def copyAndReset(): PythonAccumulatorV2 = new PythonAccumulatorV2(serverHost, serverPort) + override def copyAndReset(): PythonAccumulatorV2 = { +new PythonAccumulatorV2(serverHost, serverPort, secretToken) + } override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): Unit = synchronized { val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2] http://git-wip-us.apache.org/repos/asf/spark/blob/a5624c7a/python/pyspark/accumulators.py -- diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 6ef8cf5..bc0be07 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -228,20 +228,49 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry -while not self.server.server_shutdown: -# Poll every 1 second for new data -- don't block in case of shutdown. -r, _, _ = select.select([self.rfile], [], [], 1) -if self.rfile in r: -num_updates = read_int(self.rfile) -for _ in range(num_updates): -(aid, update) = pickleSer._read_with_length(self.rfile) -_accumulatorRegistry[aid] += update -# Write a byte in acknowledgement -self.wfile.write(struct.pack("!b", 1)) +auth_token = self.server.auth_token + +def poll(func): +while not self.server.server_shutdown: +# Poll every 1 second for new data -- don't block in case of shutdown. +r, _, _ = select.select([self.rfile], [], [], 1) +if self.rfile in r: +if func(): +break + +def accum_updates(): +num_updates = read_int(self.rfile) +for _ in range(num_updates): +(aid, update) = pickleSer._read_with_length(self.rfile) +_accumulatorRegistry[aid] += update +# Write a byte in acknowledgement +self.wfile.write(struct.pack("!b", 1)) +return False + +def authenticate_and_accum_updates(): +received_token = self.rfile.read(len(auth_token)) +if
svn commit: r28538 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_12_01-92b4884-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 19:16:09 2018 New Revision: 28538 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_12_01-92b4884 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled
Repository: spark Updated Branches: refs/heads/master c32dbd6bd -> 92b48842b [SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled ## What changes were proposed in this pull request? We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again). We perform the check on job submit and fail fast if running a barrier stage with dynamic resource allocation enabled. ## How was this patch tested? Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast cases that submitted a job containing one or more barrier stages. Author: Xingbo Jiang Closes #21915 from jiangxb1987/SPARK-24954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b48842 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b48842 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b48842 Branch: refs/heads/master Commit: 92b48842b944a3e430472294cdc3c481bad6b804 Parents: c32dbd6 Author: Xingbo Jiang Authored: Fri Aug 3 09:36:56 2018 -0700 Committer: Xiangrui Meng Committed: Fri Aug 3 09:36:56 2018 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 25 + .../spark/BarrierStageOnSubmittedSuite.scala| 57 2 files changed, 71 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3dd0718..cf1fcbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -364,6 +364,7 @@ class DAGScheduler( */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd +checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) @@ -385,6 +386,23 @@ class DAGScheduler( } /** + * We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead + * to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that + * we acquire some executors (but not enough to launch all the tasks in a barrier stage) and + * later release them due to executor idle time expire, and then acquire again). + * + * We perform the check on job submit and fail fast if running a barrier stage with dynamic + * resource allocation enabled. + * + * TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage + */ + private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = { +if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) { + throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) +} + } + + /** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( @@ -393,6 +411,7 @@ class DAGScheduler( partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { +checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -2001,4 +2020,10 @@ private[spark] object DAGScheduler { "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + "(scala) or barrierRdd.collect()[0] (python).\n" + "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." + + // Error message when running a barrier stage with dynamic resource allocation enabled. + val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = +"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + + "now. You can disable dynamic resource allocation by setting Spark conf " + + "\"spark.dynamicAllocation.enabled\" to \"false\"." } http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
svn commit: r28535 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_08_02-c32dbd6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 15:15:56 2018 New Revision: 28535 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_08_02-c32dbd6 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28529 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_04_02-273b284-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 11:20:42 2018 New Revision: 28529 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_04_02-273b284 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24993][SQL] Make Avro Fast Again
Repository: spark Updated Branches: refs/heads/master 53ca9755d -> 273b28404 [SPARK-24993][SQL] Make Avro Fast Again ## What changes were proposed in this pull request? When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset. With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further. Spark 2.4 ``` spark git:(master) ./build/mvn -DskipTests clean package spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar ``` Spark 2.3 + databricks avro ``` spark git:(branch-2.3) ./build/mvn -DskipTests clean package spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ``` Current master: ``` +---++ |summary| writeTimes| +---++ | count| 100| | mean| 2.95621| | stddev|0.030895815479469294| |min| 2.915| |max| 3.049| +---++ +---++ |summary| readTimes| +---++ | count| 100| | mean| 0.310729995| | stddev|0.054139709842390006| |min| 0.259| |max| 0.692| +---++ ``` Current master with this PR: ``` +---++ |summary| writeTimes| +---++ | count| 100| | mean| 2.58043002| | stddev|0.011175600225672079| |min| 2.558| |max|2.62| +---++ +---++ |summary| readTimes| +---++ | count| 100| | mean| 0.299220004| | stddev|0.058261961532514166| |min| 0.251| |max| 0.732| +---++ ``` Spark 2.3 + databricks avro: ``` +---++ |summary| writeTimes| +---++ | count| 100| | mean| 1.77305005| | stddev|0.025199156230863575| |min| 1.729| |max| 1.833| +---++ +---+---+ |summary| readTimes| +---+---+ | count|100| | mean|0.29715| | stddev|0.05685643358850465| |min| 0.258| |max| 0.718| +---+---+ ``` The following is the test code to reproduce the result. ```scala spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") val sparkSession = spark import sparkSession.implicits._ val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid => val features = Array.fill(16000)(scala.math.random) (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features) }.toDF("uid", "random", "uuid1", "uuid2", "features").cache() val size = df.count() // Write into ramdisk to rule out the disk IO impact val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/" val n = 150 val writeTimes = new Array[Double](n) var i = 0 while (i < n) { val t1 = System.currentTimeMillis() df.write .format("com.databricks.spark.avro") .mode("overwrite") .save(tempSaveDir) val t2 = System.currentTimeMillis() writeTimes(i) = (t2 - t1) / 1000.0 i += 1 } df.unpersist() // The first 50 runs are for warm-up val readTimes = new Array[Double](n) i = 0 while (i < n) { val t1 = System.currentTimeMillis() val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir) assert(readDF.count() == size) val t2 = System.currentTimeMillis() readTimes(i) = (t2 - t1) / 1000.0 i += 1 } spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show() spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show() ``` ## How was this patch tested? Existing tests. Author: DB Tsai Author: Brian Lindblom Closes #21952 from dbtsai/avro-performance-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/273b2840 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/273b2840 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/273b2840 Branch: refs/heads/master Commit: 273b28404ca8bcdb07878be8fb0053e6625046bf Parents: 53ca975 Author: DB Tsai Authored: Fri Aug 3 07:43:54 2018 + Committer: DB Tsai Committed: Fri Aug 3 07:43:54 2018 +
spark git commit: [SPARK-25009][CORE] Standalone Cluster mode application submit is not working
Repository: spark Updated Branches: refs/heads/master ebf33a333 -> 53ca9755d [SPARK-25009][CORE] Standalone Cluster mode application submit is not working ## What changes were proposed in this pull request? It seems 'doRunMain()' has been removed accidentally by other PR and due to that the application submission is not happening, this PR adds back the 'doRunMain()' for standalone cluster submission. ## How was this patch tested? I verified it manually by submitting application in standalone cluster mode, all the applications are submitting to the Master with the change. Author: Devaraj K Closes #21979 from devaraj-kavali/SPARK-25009. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53ca9755 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53ca9755 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53ca9755 Branch: refs/heads/master Commit: 53ca9755dbb3b952b16b198d31b7964d56bb5ef9 Parents: ebf33a3 Author: Devaraj K Authored: Fri Aug 3 07:23:56 2018 + Committer: DB Tsai Committed: Fri Aug 3 07:23:56 2018 + -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53ca9755/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e7310ee..6e70bcd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -181,6 +181,7 @@ private[spark] class SparkSubmit extends Logging { if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") +doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28525 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_00_01-19a4531-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 07:16:59 2018 New Revision: 28525 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_00_01-19a4531 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SAPRK-25011][ML] add prefix to __all__ in fpm.py
Repository: spark Updated Branches: refs/heads/master 19a453191 -> ebf33a333 [SAPRK-25011][ML] add prefix to __all__ in fpm.py ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-25011 add prefix to __all__ in fpm.py ## How was this patch tested? existing unit test. Author: Yuhao Yang Closes #21981 from hhbyyh/prefixall. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebf33a33 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebf33a33 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebf33a33 Branch: refs/heads/master Commit: ebf33a333e9f7ad46f37233eee843e31028a1d62 Parents: 19a4531 Author: Yuhao Yang Authored: Fri Aug 3 15:02:41 2018 +0800 Committer: hyukjinkwon Committed: Fri Aug 3 15:02:41 2018 +0800 -- python/pyspark/ml/fpm.py | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ebf33a33/python/pyspark/ml/fpm.py -- diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index fd19fd9..f939442 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -21,7 +21,7 @@ from pyspark.ml.util import * from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, _jvm from pyspark.ml.param.shared import * -__all__ = ["FPGrowth", "FPGrowthModel"] +__all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"] class HasMinSupport(Params): @@ -313,14 +313,15 @@ class PrefixSpan(JavaParams): def findFrequentSequentialPatterns(self, dataset): """ .. note:: Experimental + Finds the complete set of frequent sequential patterns in the input sequences of itemsets. :param dataset: A dataframe containing a sequence column which is `ArrayType(ArrayType(T))` type, T is the item type for the input dataset. :return: A `DataFrame` that contains columns of sequence and corresponding frequency. The schema of it will be: - - `sequence: ArrayType(ArrayType(T))` (T is the item type) - - `freq: Long` + - `sequence: ArrayType(ArrayType(T))` (T is the item type) + - `freq: Long` >>> from pyspark.ml.fpm import PrefixSpan >>> from pyspark.sql import Row - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org