spark git commit: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL Queries
Repository: spark Updated Branches: refs/heads/master 41c2227a2 -> 36ea55e97 [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL Queries ## What changes were proposed in this pull request? Many Spark SQL users in my company have asked for a way to control the number of output files in Spark SQL. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code. We propose adding the following Hive-style Coalesce and Repartition Hint to Spark SQL: ``` ... SELECT /*+ COALESCE(numPartitions) */ ... ... SELECT /*+ REPARTITION(numPartitions) */ ... ``` Multiple such hints are allowed. Multiple nodes are inserted into the logical plan, and the optimizer will pick the leftmost hint. ``` INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t == Logical Plan == 'InsertIntoTable 'UnresolvedRelation `s`, false, false +- 'UnresolvedHint REPARTITION, [100] +- 'UnresolvedHint COALESCE, [500] +- 'UnresolvedHint COALESCE, [10] +- 'Project [*] +- 'UnresolvedRelation `t` == Optimized Logical Plan == InsertIntoHadoopFsRelationCommand ... +- Repartition 100, true +- HiveTableRelation ... ``` ## How was this patch tested? All unit tests. Manual tests using explain. Author: John Zhuge Closes #21911 from jzhuge/SPARK-24940. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36ea55e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36ea55e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36ea55e9 Branch: refs/heads/master Commit: 36ea55e97e609d25de5d8cd47ce8d2a7ae990d62 Parents: 41c2227 Author: John Zhuge Authored: Sat Aug 4 02:27:15 2018 -0400 Committer: Xiao Li Committed: Sat Aug 4 02:27:15 2018 -0400 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../sql/catalyst/analysis/ResolveHints.scala| 28 .../catalyst/analysis/ResolveHintsSuite.scala | 35 .../sql/catalyst/parser/PlanParserSuite.scala | 27 +++ .../apache/spark/sql/DataFrameHintSuite.scala | 10 ++ .../org/apache/spark/sql/SQLQuerySuite.scala| 33 ++ 6 files changed, 134 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36ea55e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f235ac..b5016fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -145,6 +145,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), + ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), http://git-wip-us.apache.org/repos/asf/spark/blob/36ea55e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index bfe5169..1ef482b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -20,10 +20,12 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.IntegerType /** @@ -103,6 +105,32 @@ object ResolveHints { } /** + * COALESCE Hint accepts name "COALESCE" and "REPARTITION". + * Its parameter includes a partition number. + */ + object ResolveCoalesceHints extends Rule[LogicalPlan] { +private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") + +def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => +val hintName = h.name.toUpperCase(Locale.ROOT) +val shuffle = hintName mat
spark git commit: [SPARK-24722][SQL] pivot() with Column type argument
Repository: spark Updated Branches: refs/heads/master 4c27663cb -> 41c2227a2 [SPARK-24722][SQL] pivot() with Column type argument ## What changes were proposed in this pull request? In the PR, I propose column-based API for the `pivot()` function. It allows using of any column expressions as the pivot column. Also this makes it consistent with how groupBy() works. ## How was this patch tested? I added new tests to `DataFramePivotSuite` and updated PySpark examples for the `pivot()` function. Author: Maxim Gekk Closes #21699 from MaxGekk/pivot-column. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41c2227a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41c2227a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41c2227a Branch: refs/heads/master Commit: 41c2227a2318029709553a588e44dee28f106350 Parents: 4c27663 Author: Maxim Gekk Authored: Sat Aug 4 14:17:32 2018 +0800 Committer: hyukjinkwon Committed: Sat Aug 4 14:17:32 2018 +0800 -- python/pyspark/sql/group.py | 8 ++ .../spark/sql/RelationalGroupedDataset.scala| 100 +-- .../apache/spark/sql/DataFramePivotSuite.scala | 88 +--- .../org/apache/spark/sql/test/SQLTestData.scala | 12 +++ 4 files changed, 167 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/41c2227a/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 0906c9c..cc1da8e 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -211,6 +211,8 @@ class GroupedData(object): >>> df4.groupBy("year").pivot("course").sum("earnings").collect() [Row(year=2012, Java=2, dotNET=15000), Row(year=2013, Java=3, dotNET=48000)] +>>> df5.groupBy("sales.year").pivot("sales.course").sum("sales.earnings").collect() +[Row(year=2012, Java=2, dotNET=15000), Row(year=2013, Java=3, dotNET=48000)] """ if values is None: jgd = self._jgd.pivot(pivot_col) @@ -296,6 +298,12 @@ def _test(): Row(course="dotNET", year=2012, earnings=5000), Row(course="dotNET", year=2013, earnings=48000), Row(course="Java", year=2013, earnings=3)]).toDF() +globs['df5'] = sc.parallelize([ +Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=1)), +Row(training="junior", sales=Row(course="Java", year=2012, earnings=2)), +Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=5000)), +Row(training="junior", sales=Row(course="dotNET", year=2013, earnings=48000)), +Row(training="expert", sales=Row(course="Java", year=2013, earnings=3))]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.group, globs=globs, http://git-wip-us.apache.org/repos/asf/spark/blob/41c2227a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 4e73b36..d700fb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -314,7 +314,67 @@ class RelationalGroupedDataset protected[sql]( * @param pivotColumn Name of the column to pivot. * @since 1.6.0 */ - def pivot(pivotColumn: String): RelationalGroupedDataset = { + def pivot(pivotColumn: String): RelationalGroupedDataset = pivot(Column(pivotColumn)) + + /** + * Pivots a column of the current `DataFrame` and performs the specified aggregation. + * There are two versions of pivot function: one that requires the caller to specify the list + * of distinct values to pivot on, and one that does not. The latter is more concise but less + * efficient, because Spark needs to first compute the list of distinct values internally. + * + * {{{ + * // Compute the sum of earnings for each year by course with each course as a separate column + * df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings") + * + * // Or without specifying column values (less efficient) + * df.groupBy("year").pivot("course").sum("earnings") + * }}} + * + * @param pivotColumn Name of the column to pivot. + * @param values List of values that will be translated to columns in the output DataFrame. + * @since 1.6.0 +
svn commit: r28552 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_20_02-4c27663-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Aug 4 03:16:11 2018 New Revision: 28552 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_20_02-4c27663 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
Repository: spark Updated Branches: refs/heads/master 8c14276c3 -> 4c27663cb [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? Increase ZK timeout and harmonize configs across Kafka tests to resolâ¦ve potentially flaky test failure ## How was this patch tested? Existing tests Author: Sean Owen Closes #21995 from srowen/SPARK-18057.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c27663c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c27663c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c27663c Branch: refs/heads/master Commit: 4c27663cb20f3cde7317ffcb2c9d42257a40057f Parents: 8c14276 Author: Sean Owen Authored: Fri Aug 3 16:22:54 2018 -0700 Committer: Shixiong Zhu Committed: Fri Aug 3 16:22:54 2018 -0700 -- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 1 + .../org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 6 +- 2 files changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c27663c/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 8229490..d89cccd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -304,6 +304,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("port", brokerPort.toString) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) +props.put("zookeeper.connection.timeout.ms", "6") props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") http://git-wip-us.apache.org/repos/asf/spark/blob/4c27663c/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 2315baf..eef4c55 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -32,6 +32,7 @@ import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} @@ -109,7 +110,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort(brokerConf.interBrokerListenerName) + brokerPort = server.boundPort(new ListenerName("PLAINTEXT")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") @@ -220,8 +221,11 @@ private[kafka010] class KafkaTestUtils extends Logging { props.put("port", brokerPort.toString) props.put("log.dir", brokerLogDir) props.put("zookeeper.connect", zkAddress) +props.put("zookeeper.connection.timeout.ms", "6") props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") +props.put("delete.topic.enable", "true") +props.put("offsets.topic.num.partitions", "1") props.put("offsets.topic.replication.factor", "1") props.put("group.initial.rebalance.delay.ms", "10") props - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28548 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_16_02-8c14276-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 23:15:56 2018 New Revision: 28548 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_16_02-8c14276 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Little typo
Repository: spark Updated Branches: refs/heads/master 92b48842b -> 8c14276c3 Little typo ## What changes were proposed in this pull request? Fixed little typo for a comment ## How was this patch tested? Manual test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Onwuka Gideon Closes #21992 from dongido001/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c14276c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c14276c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c14276c Branch: refs/heads/master Commit: 8c14276c3362798b030db7a9fcdc31a10d04b643 Parents: 92b4884 Author: Onwuka Gideon Authored: Fri Aug 3 17:39:40 2018 -0500 Committer: Sean Owen Committed: Fri Aug 3 17:39:40 2018 -0500 -- python/pyspark/streaming/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c14276c/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index a451582..3fa57ca 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -222,7 +222,7 @@ class StreamingContext(object): Set each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and releases them for garbage collection. -This method allows the developer to specify how to long to remember +This method allows the developer to specify how long to remember the RDDs (if the developer wishes to query old data outside the DStream computation). @@ -287,7 +287,7 @@ class StreamingContext(object): def queueStream(self, rdds, oneAtATime=True, default=None): """ -Create an input stream from an queue of RDDs or list. In each batch, +Create an input stream from a queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue. .. note:: Changes to the queue after the stream is created will not be recognized. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [PYSPARK] Updates to Accumulators
Repository: spark Updated Branches: refs/heads/branch-2.1 a3eb07db3 -> b2e0f68f6 [PYSPARK] Updates to Accumulators (cherry picked from commit 15fc2372269159ea2556b028d4eb8860c4108650) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e0f68f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e0f68f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e0f68f Branch: refs/heads/branch-2.1 Commit: b2e0f68f615cbe2cf74f9813ece76c311fe8e911 Parents: a3eb07d Author: LucaCanali Authored: Wed Jul 18 23:19:02 2018 +0200 Committer: Imran Rashid Committed: Fri Aug 3 16:30:40 2018 -0500 -- .../org/apache/spark/api/python/PythonRDD.scala | 12 +++-- python/pyspark/accumulators.py | 53 +++- python/pyspark/context.py | 5 +- 3 files changed, 53 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2e0f68f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b1190b9..de548e8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -886,8 +886,9 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By */ private[spark] class PythonAccumulatorV2( @transient private val serverHost: String, -private val serverPort: Int) - extends CollectionAccumulator[Array[Byte]] { +private val serverPort: Int, +private val secretToken: String) + extends CollectionAccumulator[Array[Byte]] with Logging{ Utils.checkHost(serverHost, "Expected hostname") @@ -902,12 +903,17 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) + logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + // send the secret just for the initial authentication when opening a new connection + socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } socket } // Need to override so the types match with PythonFunction - override def copyAndReset(): PythonAccumulatorV2 = new PythonAccumulatorV2(serverHost, serverPort) + override def copyAndReset(): PythonAccumulatorV2 = { +new PythonAccumulatorV2(serverHost, serverPort, secretToken) + } override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): Unit = synchronized { val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2] http://git-wip-us.apache.org/repos/asf/spark/blob/b2e0f68f/python/pyspark/accumulators.py -- diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 6ef8cf5..bc0be07 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -228,20 +228,49 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry -while not self.server.server_shutdown: -# Poll every 1 second for new data -- don't block in case of shutdown. -r, _, _ = select.select([self.rfile], [], [], 1) -if self.rfile in r: -num_updates = read_int(self.rfile) -for _ in range(num_updates): -(aid, update) = pickleSer._read_with_length(self.rfile) -_accumulatorRegistry[aid] += update -# Write a byte in acknowledgement -self.wfile.write(struct.pack("!b", 1)) +auth_token = self.server.auth_token + +def poll(func): +while not self.server.server_shutdown: +# Poll every 1 second for new data -- don't block in case of shutdown. +r, _, _ = select.select([self.rfile], [], [], 1) +if self.rfile in r: +if func(): +break + +def accum_updates(): +num_updates = read_int(self.rfile) +for _ in range(num_updates): +(aid, update) = pickleSer._read_with_length(self.rfile) +_accumulatorRegistry[aid] += update +# Write a byte in acknowledgement +self.wfile.write(struct.pack("!b", 1)) +return False + +def authenticate_and_accum_updates(): +received_token = self.rfile.read(len(auth_token)) +if isinstance(received_t
spark git commit: [PYSPARK] Updates to Accumulators
Repository: spark Updated Branches: refs/heads/branch-2.2 22ce8051f -> a5624c7ae [PYSPARK] Updates to Accumulators (cherry picked from commit 15fc2372269159ea2556b028d4eb8860c4108650) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5624c7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5624c7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5624c7a Branch: refs/heads/branch-2.2 Commit: a5624c7ae29d6d49117dd78642879bf978212d30 Parents: 22ce805 Author: LucaCanali Authored: Wed Jul 18 23:19:02 2018 +0200 Committer: Imran Rashid Committed: Fri Aug 3 14:49:38 2018 -0500 -- .../org/apache/spark/api/python/PythonRDD.scala | 12 +++-- python/pyspark/accumulators.py | 53 +++- python/pyspark/context.py | 5 +- 3 files changed, 53 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5624c7a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0662792..7b5a179 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -900,8 +900,9 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By */ private[spark] class PythonAccumulatorV2( @transient private val serverHost: String, -private val serverPort: Int) - extends CollectionAccumulator[Array[Byte]] { +private val serverPort: Int, +private val secretToken: String) + extends CollectionAccumulator[Array[Byte]] with Logging{ Utils.checkHost(serverHost, "Expected hostname") @@ -916,12 +917,17 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) + logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + // send the secret just for the initial authentication when opening a new connection + socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } socket } // Need to override so the types match with PythonFunction - override def copyAndReset(): PythonAccumulatorV2 = new PythonAccumulatorV2(serverHost, serverPort) + override def copyAndReset(): PythonAccumulatorV2 = { +new PythonAccumulatorV2(serverHost, serverPort, secretToken) + } override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): Unit = synchronized { val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2] http://git-wip-us.apache.org/repos/asf/spark/blob/a5624c7a/python/pyspark/accumulators.py -- diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 6ef8cf5..bc0be07 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -228,20 +228,49 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry -while not self.server.server_shutdown: -# Poll every 1 second for new data -- don't block in case of shutdown. -r, _, _ = select.select([self.rfile], [], [], 1) -if self.rfile in r: -num_updates = read_int(self.rfile) -for _ in range(num_updates): -(aid, update) = pickleSer._read_with_length(self.rfile) -_accumulatorRegistry[aid] += update -# Write a byte in acknowledgement -self.wfile.write(struct.pack("!b", 1)) +auth_token = self.server.auth_token + +def poll(func): +while not self.server.server_shutdown: +# Poll every 1 second for new data -- don't block in case of shutdown. +r, _, _ = select.select([self.rfile], [], [], 1) +if self.rfile in r: +if func(): +break + +def accum_updates(): +num_updates = read_int(self.rfile) +for _ in range(num_updates): +(aid, update) = pickleSer._read_with_length(self.rfile) +_accumulatorRegistry[aid] += update +# Write a byte in acknowledgement +self.wfile.write(struct.pack("!b", 1)) +return False + +def authenticate_and_accum_updates(): +received_token = self.rfile.read(len(auth_token)) +if isinstance(received_t
svn commit: r28538 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_12_01-92b4884-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 19:16:09 2018 New Revision: 28538 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_12_01-92b4884 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled
Repository: spark Updated Branches: refs/heads/master c32dbd6bd -> 92b48842b [SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled ## What changes were proposed in this pull request? We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again). We perform the check on job submit and fail fast if running a barrier stage with dynamic resource allocation enabled. ## How was this patch tested? Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast cases that submitted a job containing one or more barrier stages. Author: Xingbo Jiang Closes #21915 from jiangxb1987/SPARK-24954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b48842 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b48842 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b48842 Branch: refs/heads/master Commit: 92b48842b944a3e430472294cdc3c481bad6b804 Parents: c32dbd6 Author: Xingbo Jiang Authored: Fri Aug 3 09:36:56 2018 -0700 Committer: Xiangrui Meng Committed: Fri Aug 3 09:36:56 2018 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 25 + .../spark/BarrierStageOnSubmittedSuite.scala| 57 2 files changed, 71 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3dd0718..cf1fcbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -364,6 +364,7 @@ class DAGScheduler( */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd +checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) @@ -385,6 +386,23 @@ class DAGScheduler( } /** + * We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead + * to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that + * we acquire some executors (but not enough to launch all the tasks in a barrier stage) and + * later release them due to executor idle time expire, and then acquire again). + * + * We perform the check on job submit and fail fast if running a barrier stage with dynamic + * resource allocation enabled. + * + * TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage + */ + private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = { +if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) { + throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) +} + } + + /** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( @@ -393,6 +411,7 @@ class DAGScheduler( partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { +checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -2001,4 +2020,10 @@ private[spark] object DAGScheduler { "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + "(scala) or barrierRdd.collect()[0] (python).\n" + "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." + + // Error message when running a barrier stage with dynamic resource allocation enabled. + val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = +"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + + "now. You can disable dynamic resource allocation by setting Spark conf " + + "\"spark.dynamicAllocation.enabled\" to \"false\"." } http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
svn commit: r28535 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_08_02-c32dbd6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 15:15:56 2018 New Revision: 28535 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_08_02-c32dbd6 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
Repository: spark Updated Branches: refs/heads/master 273b28404 -> c32dbd6bd [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise. ## How was this patch tested? Existing tests. Author: Sean Owen Closes #21955 from srowen/SPARK-18057.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c32dbd6b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c32dbd6b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c32dbd6b Branch: refs/heads/master Commit: c32dbd6bd55cdff4d73408ba5fd6fe18056048fe Parents: 273b284 Author: Sean Owen Authored: Fri Aug 3 08:17:18 2018 -0500 Committer: Sean Owen Committed: Fri Aug 3 08:17:18 2018 -0500 -- external/kafka-0-10-sql/pom.xml | 10 +- external/kafka-0-10/pom.xml | 26 ++-- .../streaming/kafka010/KafkaRDDSuite.scala | 32 .../streaming/kafka010/KafkaTestUtils.scala | 12 .../kafka010/mocks/MockScheduler.scala | 3 +- .../streaming/kafka010/mocks/MockTime.scala | 10 +++--- 6 files changed, 50 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10-sql/pom.xml -- diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 9550003..8588e8b 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,6 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 + 2.0.0 jar @@ -128,13 +129,4 @@ target/scala-${scala.binary.version}/test-classes - - - scala-2.12 - -0.10.1.1 - - - - http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/pom.xml -- diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 3b124b2..a97fd35 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -28,7 +28,8 @@ spark-streaming-kafka-0-10_2.11 streaming-kafka-0-10 -0.10.0.1 + +2.0.0 jar Spark Integration for Kafka 0.10 @@ -58,6 +59,20 @@ kafka_${scala.binary.version} ${kafka.version} test + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + net.sf.jopt-simple @@ -93,13 +108,4 @@ target/scala-${scala.binary.version}/test-classes - - - scala-2.12 - -0.10.1.1 - - - - http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 271adea..3ac6509 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -23,11 +23,11 @@ import java.io.File import scala.collection.JavaConverters._ import scala.util.Random -import kafka.common.TopicAndPartition -import kafka.log._ -import kafka.message._ +import kafka.log.{CleanerConfig, Log, LogCleaner, LogConfig, ProducerStateManager} +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils.Pool import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.BeforeAndAfterAll @@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { val mockTime = new MockTime() -// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api -val logs = new Pool[TopicAndPartition, Log]() +val logs = new Pool[TopicPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() lo
svn commit: r28529 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_04_02-273b284-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 11:20:42 2018 New Revision: 28529 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_04_02-273b284 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24993][SQL] Make Avro Fast Again
Repository: spark Updated Branches: refs/heads/master 53ca9755d -> 273b28404 [SPARK-24993][SQL] Make Avro Fast Again ## What changes were proposed in this pull request? When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset. With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further. Spark 2.4 ``` spark git:(master) ./build/mvn -DskipTests clean package spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar ``` Spark 2.3 + databricks avro ``` spark git:(branch-2.3) ./build/mvn -DskipTests clean package spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ``` Current master: ``` +---++ |summary| writeTimes| +---++ | count| 100| | mean| 2.95621| | stddev|0.030895815479469294| |min| 2.915| |max| 3.049| +---++ +---++ |summary| readTimes| +---++ | count| 100| | mean| 0.310729995| | stddev|0.054139709842390006| |min| 0.259| |max| 0.692| +---++ ``` Current master with this PR: ``` +---++ |summary| writeTimes| +---++ | count| 100| | mean| 2.58043002| | stddev|0.011175600225672079| |min| 2.558| |max|2.62| +---++ +---++ |summary| readTimes| +---++ | count| 100| | mean| 0.299220004| | stddev|0.058261961532514166| |min| 0.251| |max| 0.732| +---++ ``` Spark 2.3 + databricks avro: ``` +---++ |summary| writeTimes| +---++ | count| 100| | mean| 1.77305005| | stddev|0.025199156230863575| |min| 1.729| |max| 1.833| +---++ +---+---+ |summary| readTimes| +---+---+ | count|100| | mean|0.29715| | stddev|0.05685643358850465| |min| 0.258| |max| 0.718| +---+---+ ``` The following is the test code to reproduce the result. ```scala spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") val sparkSession = spark import sparkSession.implicits._ val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid => val features = Array.fill(16000)(scala.math.random) (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features) }.toDF("uid", "random", "uuid1", "uuid2", "features").cache() val size = df.count() // Write into ramdisk to rule out the disk IO impact val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/" val n = 150 val writeTimes = new Array[Double](n) var i = 0 while (i < n) { val t1 = System.currentTimeMillis() df.write .format("com.databricks.spark.avro") .mode("overwrite") .save(tempSaveDir) val t2 = System.currentTimeMillis() writeTimes(i) = (t2 - t1) / 1000.0 i += 1 } df.unpersist() // The first 50 runs are for warm-up val readTimes = new Array[Double](n) i = 0 while (i < n) { val t1 = System.currentTimeMillis() val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir) assert(readDF.count() == size) val t2 = System.currentTimeMillis() readTimes(i) = (t2 - t1) / 1000.0 i += 1 } spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show() spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show() ``` ## How was this patch tested? Existing tests. Author: DB Tsai Author: Brian Lindblom Closes #21952 from dbtsai/avro-performance-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/273b2840 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/273b2840 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/273b2840 Branch: refs/heads/master Commit: 273b28404ca8bcdb07878be8fb0053e6625046bf Parents: 53ca975 Author: DB Tsai Authored: Fri Aug 3 07:43:54 2018 + Committer: DB Tsai Committed: Fri Aug 3 07:43:54 2018 + --
spark git commit: [SPARK-25009][CORE] Standalone Cluster mode application submit is not working
Repository: spark Updated Branches: refs/heads/master ebf33a333 -> 53ca9755d [SPARK-25009][CORE] Standalone Cluster mode application submit is not working ## What changes were proposed in this pull request? It seems 'doRunMain()' has been removed accidentally by other PR and due to that the application submission is not happening, this PR adds back the 'doRunMain()' for standalone cluster submission. ## How was this patch tested? I verified it manually by submitting application in standalone cluster mode, all the applications are submitting to the Master with the change. Author: Devaraj K Closes #21979 from devaraj-kavali/SPARK-25009. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53ca9755 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53ca9755 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53ca9755 Branch: refs/heads/master Commit: 53ca9755dbb3b952b16b198d31b7964d56bb5ef9 Parents: ebf33a3 Author: Devaraj K Authored: Fri Aug 3 07:23:56 2018 + Committer: DB Tsai Committed: Fri Aug 3 07:23:56 2018 + -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53ca9755/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e7310ee..6e70bcd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -181,6 +181,7 @@ private[spark] class SparkSubmit extends Logging { if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") +doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28525 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_03_00_01-19a4531-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 3 07:16:59 2018 New Revision: 28525 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_03_00_01-19a4531 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SAPRK-25011][ML] add prefix to __all__ in fpm.py
Repository: spark Updated Branches: refs/heads/master 19a453191 -> ebf33a333 [SAPRK-25011][ML] add prefix to __all__ in fpm.py ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-25011 add prefix to __all__ in fpm.py ## How was this patch tested? existing unit test. Author: Yuhao Yang Closes #21981 from hhbyyh/prefixall. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebf33a33 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebf33a33 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebf33a33 Branch: refs/heads/master Commit: ebf33a333e9f7ad46f37233eee843e31028a1d62 Parents: 19a4531 Author: Yuhao Yang Authored: Fri Aug 3 15:02:41 2018 +0800 Committer: hyukjinkwon Committed: Fri Aug 3 15:02:41 2018 +0800 -- python/pyspark/ml/fpm.py | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ebf33a33/python/pyspark/ml/fpm.py -- diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index fd19fd9..f939442 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -21,7 +21,7 @@ from pyspark.ml.util import * from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, _jvm from pyspark.ml.param.shared import * -__all__ = ["FPGrowth", "FPGrowthModel"] +__all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"] class HasMinSupport(Params): @@ -313,14 +313,15 @@ class PrefixSpan(JavaParams): def findFrequentSequentialPatterns(self, dataset): """ .. note:: Experimental + Finds the complete set of frequent sequential patterns in the input sequences of itemsets. :param dataset: A dataframe containing a sequence column which is `ArrayType(ArrayType(T))` type, T is the item type for the input dataset. :return: A `DataFrame` that contains columns of sequence and corresponding frequency. The schema of it will be: - - `sequence: ArrayType(ArrayType(T))` (T is the item type) - - `freq: Long` + - `sequence: ArrayType(ArrayType(T))` (T is the item type) + - `freq: Long` >>> from pyspark.ml.fpm import PrefixSpan >>> from pyspark.sql import Row - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org