spark git commit: [SPARK-25133][SQL][DOC] Avro data source guide
Repository: spark Updated Branches: refs/heads/master 1747469a1 -> 05974f943 [SPARK-25133][SQL][DOC] Avro data source guide ## What changes were proposed in this pull request? Create documentation for AVRO data source. The new page will be linked in https://spark.apache.org/docs/latest/sql-programming-guide.html For preview please unzip the following file: [AvroDoc.zip](https://github.com/apache/spark/files/2313011/AvroDoc.zip) Closes #22121 from gengliangwang/avroDoc. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05974f94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05974f94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05974f94 Branch: refs/heads/master Commit: 05974f9431e9718a5f331a9892b7d81aca8387a6 Parents: 1747469 Author: Gengliang Wang Authored: Thu Aug 23 13:45:49 2018 +0800 Committer: Wenchen Fan Committed: Thu Aug 23 13:45:49 2018 +0800 -- docs/avro-data-source-guide.md | 380 docs/sql-programming-guide.md | 3 + 2 files changed, 383 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/05974f94/docs/avro-data-source-guide.md -- diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md new file mode 100644 index 000..d3b81f0 --- /dev/null +++ b/docs/avro-data-source-guide.md @@ -0,0 +1,380 @@ +--- +layout: global +title: Apache Avro Data Source Guide +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. + +## Deploying +The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + +./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + +./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Load and Save Functions + +Since `spark-avro` module is external, there is no `.avro` API in +`DataFrameReader` or `DataFrameWriter`. + +To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). + + +{% highlight scala %} + +val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") +usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight java %} + +Dataset usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); +usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); + +{% endhighlight %} + + +{% highlight python %} + +df = spark.read.format("avro").load("examples/src/main/resources/users.avro") +df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight r %} + +df <- read.df("examples/src/main/resources/users.avro", "avro") +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + +{% endhighlight %} + + + +## to_avro() and from_avro() +The Avro package provides function `to_avro` to encode a column as binary in Avro +format, and `from_avro()` to decode Avro binary data into a column. Both functions transform one column to +another column, and the input/output SQL data type can be complex type or primitive type. + +Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. +* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a
svn commit: r28901 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_20_01-1747469-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Aug 23 03:15:47 2018 New Revision: 28901 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_22_20_01-1747469 docs [This commit notification would consist of 1477 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25167][SPARKR][TEST][MINOR] Minor fixes for R sql tests
Repository: spark Updated Branches: refs/heads/master 0295ad40d -> 1747469a1 [SPARK-25167][SPARKR][TEST][MINOR] Minor fixes for R sql tests ## What changes were proposed in this pull request? A few SQL tests for R were failing in my development environment. In this PR, i am attempting to address some of them. Below are the reasons for the failure. - The catalog api tests assumes catalog artifacts named "foo" to be non existent. I think name such as foo and bar are common and i use it frequently. I have changed it to a string that i hope is less likely to collide. - One test assumes that we only have one database in the system. I had more than one and it caused the test to fail. I have changed that check. - One more test which compares two timestamp values fail - i am debugging this now. I will send it as a followup - may be. ## How was this patch tested? Its a test fix. Closes #22161 from dilipbiswal/r-sql-test-fix1. Authored-by: Dilip Biswal Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1747469a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1747469a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1747469a Branch: refs/heads/master Commit: 1747469a1ff0b0ab6c5545fe6de63ffe42660580 Parents: 0295ad4 Author: Dilip Biswal Authored: Thu Aug 23 10:56:17 2018 +0800 Committer: hyukjinkwon Committed: Thu Aug 23 10:56:17 2018 +0800 -- R/pkg/tests/fulltests/test_sparkSQL.R | 20 +++- 1 file changed, 11 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1747469a/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index bff6e35..e1f3cf3 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -734,8 +734,8 @@ test_that("test cache, uncache and clearCache", { clearCache() expect_true(dropTempView("table1")) - expect_error(uncacheTable("foo"), - "Error in uncacheTable : analysis error - Table or view not found: foo") + expect_error(uncacheTable("zxwtyswklpf"), + "Error in uncacheTable : analysis error - Table or view not found: zxwtyswklpf") }) test_that("insertInto() on a registered table", { @@ -3632,11 +3632,11 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { expect_equal(currentDatabase(), "default") expect_error(setCurrentDatabase("default"), NA) - expect_error(setCurrentDatabase("foo"), - "Error in setCurrentDatabase : analysis error - Database 'foo' does not exist") + expect_error(setCurrentDatabase("zxwtyswklpf"), +"Error in setCurrentDatabase : analysis error - Database 'zxwtyswklpf' does not exist") dbs <- collect(listDatabases()) expect_equal(names(dbs), c("name", "description", "locationUri")) - expect_equal(dbs[[1]], "default") + expect_equal(which(dbs[, 1] == "default"), 1) }) test_that("catalog APIs, listTables, listColumns, listFunctions", { @@ -3659,8 +3659,9 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { expect_equal(colnames(c), c("name", "description", "dataType", "nullable", "isPartition", "isBucket")) expect_equal(collect(c)[[1]][[1]], "speed") - expect_error(listColumns("foo", "default"), - "Error in listColumns : analysis error - Table 'foo' does not exist in database 'default'") + expect_error(listColumns("zxwtyswklpf", "default"), + paste("Error in listColumns : analysis error - Table", + "'zxwtyswklpf' does not exist in database 'default'")) f <- listFunctions() expect_true(nrow(f) >= 200) # 250 @@ -3668,8 +3669,9 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { c("name", "database", "description", "className", "isTemporary")) expect_equal(take(orderBy(f, "className"), 1)$className, "org.apache.spark.sql.catalyst.expressions.Abs") - expect_error(listFunctions("foo_db"), - "Error in listFunctions : analysis error - Database 'foo_db' does not exist") + expect_error(listFunctions("zxwtyswklpf_db"), + paste("Error in listFunctions : analysis error - Database", + "'zxwtyswklpf_db' does not exist")) # recoverPartitions does not work with tempory view expect_error(recoverPartitions("cars"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25127] DataSourceV2: Remove SupportsPushDownCatalystFilters
Repository: spark Updated Branches: refs/heads/master 2bc7b7553 -> 0295ad40d [SPARK-25127] DataSourceV2: Remove SupportsPushDownCatalystFilters ## What changes were proposed in this pull request? They depend on internal Expression APIs. Let's see how far we can get without it. ## How was this patch tested? Just some code removal. There's no existing tests as far as I can tell so it's easy to remove. Closes #22185 from rxin/SPARK-25127. Authored-by: Reynold Xin Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0295ad40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0295ad40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0295ad40 Branch: refs/heads/master Commit: 0295ad40def41b9a8ccefaaa1a7658899fb632a4 Parents: 2bc7b75 Author: Reynold Xin Authored: Thu Aug 23 08:10:45 2018 +0800 Committer: Wenchen Fan Committed: Thu Aug 23 08:10:45 2018 +0800 -- .../reader/SupportsPushDownCatalystFilters.java | 57 .../v2/reader/SupportsPushDownFilters.java | 4 -- .../datasources/v2/DataSourceV2Strategy.scala | 5 -- 3 files changed, 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0295ad40/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java deleted file mode 100644 index 9d79a18..000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.catalyst.expressions.Expression; - -/** - * A mix-in interface for {@link ScanConfigBuilder}. Data source readers can implement this - * interface to push down arbitrary expressions as predicates to the data source. - * This is an experimental and unstable interface as {@link Expression} is not public and may get - * changed in the future Spark versions. - * - * Note that, if data source readers implement both this interface and - * {@link SupportsPushDownFilters}, Spark will ignore {@link SupportsPushDownFilters} and only - * process this interface. - */ -@InterfaceStability.Unstable -public interface SupportsPushDownCatalystFilters extends ScanConfigBuilder { - - /** - * Pushes down filters, and returns filters that need to be evaluated after scanning. - */ - Expression[] pushCatalystFilters(Expression[] filters); - - /** - * Returns the catalyst filters that are pushed to the data source via - * {@link #pushCatalystFilters(Expression[])}. - * - * There are 3 kinds of filters: - * 1. pushable filters which don't need to be evaluated again after scanning. - * 2. pushable filters which still need to be evaluated after scanning, e.g. parquet - * row group filter. - * 3. non-pushable filters. - * Both case 1 and 2 should be considered as pushed filters and should be returned by this method. - * - * It's possible that there is no filters in the query and - * {@link #pushCatalystFilters(Expression[])} is never called, empty array should be returned for - * this case. - */ - Expression[] pushedCatalystFilters(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/0295ad40/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index 5d32a8a..5e7985f 100644 ---
svn commit: r28899 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_16_02-49a1993-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 22 23:16:07 2018 New Revision: 28899 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_22_16_02-49a1993 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24785][SHELL] Making sure REPL prints Spark UI info and then Welcome message
Repository: spark Updated Branches: refs/heads/master 49a1993b1 -> 2bc7b7553 [SPARK-24785][SHELL] Making sure REPL prints Spark UI info and then Welcome message ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/21495 the welcome message is printed first, and then Scala prompt will be shown before the Spark UI info is printed. Although it's a minor issue, but visually, it doesn't look as nice as the existing behavior. This PR intends to fix it by duplicating the Scala `process` code to arrange the printing order. However, one variable is private, so reflection has to be used which is not desirable. We can use this PR to brainstorm how to handle it properly and how Scala can change their APIs to fit our need. ## How was this patch tested? Existing test Closes #21749 from dbtsai/repl-followup. Authored-by: DB Tsai Signed-off-by: DB Tsai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc7b755 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc7b755 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc7b755 Branch: refs/heads/master Commit: 2bc7b75537ec81184048738883b282e257cc58de Parents: 49a1993 Author: DB Tsai Authored: Wed Aug 22 23:14:56 2018 + Committer: DB Tsai Committed: Wed Aug 22 23:14:56 2018 + -- .../org/apache/spark/repl/SparkILoop.scala | 138 ++- .../spark/repl/SparkILoopInterpreter.scala | 18 +-- 2 files changed, 138 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2bc7b755/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index a44051b..9426526 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -22,8 +22,16 @@ import java.io.BufferedReader // scalastyle:off println import scala.Predef.{println => _, _} // scalastyle:on println +import scala.concurrent.Future +import scala.reflect.classTag +import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader +import scala.reflect.io.File +import scala.tools.nsc.{GenericRunnerSettings, Properties} import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} +import scala.tools.nsc.interpreter.{isReplDebug, isReplPower, replProps} +import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, JPrintWriter} +import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, SplashReader} +import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain import scala.tools.nsc.util.stringFromStream import scala.util.Properties.{javaVersion, javaVmName, versionString} @@ -36,7 +44,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this() = this(None, new JPrintWriter(Console.out, true)) override def createInterpreter(): Unit = { -intp = new SparkILoopInterpreter(settings, out, initializeSpark) +intp = new SparkILoopInterpreter(settings, out) } val initializationCommands: Seq[String] = Seq( @@ -116,6 +124,132 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) super.replay() } + /** + * The following code is mostly a copy of `process` implementation in `ILoop.scala` in Scala + * + * In newer version of Scala, `printWelcome` is the first thing to be called. As a result, + * SparkUI URL information would be always shown after the welcome message. + * + * However, this is inconsistent compared with the existing version of Spark which will always + * show SparkUI URL first. + * + * The only way we can make it consistent will be duplicating the Scala code. + * + * We should remove this duplication once Scala provides a way to load our custom initialization + * code, and also customize the ordering of printing welcome message. + */ + override def process(settings: Settings): Boolean = savingContextLoader { + +def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) + +/** Reader to use before interpreter is online. */ +def preLoop = { + val sr = SplashReader(newReader) { r => +in = r +in.postInit() + } + in = sr + SplashLoop(sr, prompt) +} + +/* Actions to cram in parallel while collecting first user input at prompt. + * Run with output muted both from ILoop and from the intp reporter. + */ +def loopPostInit(): Unit = mumly { + // Bind intp somewhere out of the regular
spark git commit: [SPARK-25163][SQL] Fix flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuiteCheck
Repository: spark Updated Branches: refs/heads/master 310632498 -> 49a1993b1 [SPARK-25163][SQL] Fix flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuiteCheck ## What changes were proposed in this pull request? `ExternalAppendOnlyMapSuiteCheck` test is flaky. We use a `SparkListener` to collect spill metrics of completed stages. `withListener` runs the code that does spill. Spill status was checked after the code finishes but it was still in `withListener`. At that time it was possibly not all events to the listener bus are processed. We should check spill status after all events are processed. ## How was this patch tested? Locally ran unit tests. Closes #22181 from viirya/SPARK-25163. Authored-by: Liang-Chi Hsieh Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49a1993b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49a1993b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49a1993b Branch: refs/heads/master Commit: 49a1993b168accb6f188c682546f12ea568173c4 Parents: 3106324 Author: Liang-Chi Hsieh Authored: Wed Aug 22 14:17:05 2018 -0700 Committer: Shixiong Zhu Committed: Wed Aug 22 14:17:05 2018 -0700 -- core/src/main/scala/org/apache/spark/TestUtils.scala | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49a1993b/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 6cc8fe1..c2ebd38 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -173,10 +173,11 @@ private[spark] object TestUtils { * Run some code involving jobs submitted to the given context and assert that the jobs spilled. */ def assertSpilled(sc: SparkContext, identifier: String)(body: => Unit): Unit = { -withListener(sc, new SpillListener) { listener => +val listener = new SpillListener +withListener(sc, listener) { _ => body - assert(listener.numSpilledStages > 0, s"expected $identifier to spill, but did not") } +assert(listener.numSpilledStages > 0, s"expected $identifier to spill, but did not") } /** @@ -184,10 +185,11 @@ private[spark] object TestUtils { * did not spill. */ def assertNotSpilled(sc: SparkContext, identifier: String)(body: => Unit): Unit = { -withListener(sc, new SpillListener) { listener => +val listener = new SpillListener +withListener(sc, listener) { _ => body - assert(listener.numSpilledStages == 0, s"expected $identifier to not spill, but did") } +assert(listener.numSpilledStages == 0, s"expected $identifier to not spill, but did") } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState
Repository: spark Updated Branches: refs/heads/master 68ec4d641 -> 310632498 [SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState ## What changes were proposed in this pull request? The race condition that caused test failure is between 2 threads. - The MicrobatchExecution thread that processes inputs to produce answers and then generates progress events. - The test thread that generates some input data, checked the answer and then verified the query generated progress event. The synchronization structure between these threads is as follows 1. MicrobatchExecution thread, in every batch, does the following in order. a. Processes batch input to generate answer. b. Signals `awaitProgressLockCondition` to wake up threads waiting for progress using `awaitOffset` c. Generates progress event 2. Test execution thread a. Calls `awaitOffset` to wait for progress, which waits on `awaitProgressLockCondition`. b. As soon as `awaitProgressLockCondition` is signaled, it would move on the in the test to check answer. c. Finally, it would verify the last generated progress event. What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c -> 1c. In other words, the progress event may be generated after the test tries to verify it. The solution has two steps. 1. Signal the waiting thread after the progress event has been generated, that is, after `finishTrigger()`. 2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a large value. This latter is to ensure that test thread for keeps waiting on `awaitProgressLockCondition`until the MicroBatchExecution thread explicitly signals it. With the existing small timeout of 100ms the following sequence can occur. - MicroBatchExecution thread updates committed offsets - Test thread waiting on `awaitProgressLockCondition` accidentally times out after 100 ms, finds that the committed offsets have been updated, therefore returns from `awaitOffset` and moves on to the progress event tests. - MicroBatchExecution thread then generates progress event and signals. But the test thread has already attempted to verify the event and failed. By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, similar to `awaitInitialization`), this above type of race condition is also avoided. ## How was this patch tested? Ran locally many times. Closes #22182 from tdas/SPARK-25184. Authored-by: Tathagata Das Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31063249 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31063249 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31063249 Branch: refs/heads/master Commit: 3106324986612800240bc8c945be90c4cb368d79 Parents: 68ec4d6 Author: Tathagata Das Authored: Wed Aug 22 12:22:53 2018 -0700 Committer: Tathagata Das Committed: Wed Aug 22 12:22:53 2018 -0700 -- .../kafka010/KafkaMicroBatchSourceSuite.scala | 3 +- .../streaming/MicroBatchExecution.scala | 5 ++- .../execution/streaming/StreamExecution.scala | 4 +- .../sql/streaming/StateStoreMetricsTest.scala | 44 +++- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- 5 files changed, 33 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 946b636..c9c5250 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -970,7 +970,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { makeSureGetOffsetCalled, Execute { q => // wait to reach the last offset in every partition -q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L))) +q.awaitOffset( + 0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), streamingTimeout.toMillis) }, CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), StopStream, http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala -- diff --git
svn commit: r28897 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_12_02-68ec4d6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 22 19:16:30 2018 New Revision: 28897 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_22_12_02-68ec4d6 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25181][CORE] Limit Thread Pool size in BlockManager Master and Slave endpoints
Repository: spark Updated Branches: refs/heads/master 2381953ab -> 68ec4d641 [SPARK-25181][CORE] Limit Thread Pool size in BlockManager Master and Slave endpoints ## What changes were proposed in this pull request? Limit Thread Pool size in BlockManager Master and Slave endpoints. Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have thread pools with nearly unbounded (Integer.MAX_VALUE) numbers of threads. In certain cases, this can lead to driver OOM errors. This change limits the thread pools to 100 threads; this should not break any existing behavior because any tasks beyond that number will get queued. ## How was this patch tested? Manual testing Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22176 from mukulmurthy/25181-threads. Authored-by: Mukul Murthy Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ec4d64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ec4d64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ec4d64 Branch: refs/heads/master Commit: 68ec4d641b87d2ab6a8cafc5d10c08253ae09e3d Parents: 2381953 Author: Mukul Murthy Authored: Wed Aug 22 10:36:20 2018 -0700 Committer: Shixiong Zhu Committed: Wed Aug 22 10:36:20 2018 -0700 -- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 3 ++- .../org/apache/spark/storage/BlockManagerSlaveEndpoint.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68ec4d64/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 8e8f7d1..f984cf7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -54,7 +54,8 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] - private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") + private val askThreadPool = +ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100) private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) private val topologyMapper = { http://git-wip-us.apache.org/repos/asf/spark/blob/68ec4d64/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 742cf4f..67544b2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -37,7 +37,7 @@ class BlockManagerSlaveEndpoint( extends ThreadSafeRpcEndpoint with Logging { private val asyncThreadPool = - ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool") + ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool", 100) private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool) // Operations that involve removing blocks may be slow and should be done asynchronously - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of pyspark.sql.functions
Repository: spark Updated Branches: refs/heads/master 71f38ac24 -> 2381953ab [SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of pyspark.sql.functions ## What changes were proposed in this pull request? Include PandasUDFType in the import all of pyspark.sql.functions ## How was this patch tested? Run the test case from the pyspark shell from the jira [spark-25105](https://jira.apache.org/jira/browse/SPARK-25105?jql=project%20%3D%20SPARK%20AND%20component%20in%20(ML%2C%20PySpark%2C%20SQL%2C%20%22Structured%20Streaming%22)) I manually test on pyspark-shell: before: ` >>> from pyspark.sql.functions import * >>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP) Traceback (most recent call last): File "", line 1, in NameError: name 'PandasUDFType' is not defined >>> ` after: ` >>> from pyspark.sql.functions import * >>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP) >>> ` Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22100 from kevinyu98/spark-25105. Authored-by: Kevin Yu Signed-off-by: Bryan Cutler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2381953a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2381953a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2381953a Branch: refs/heads/master Commit: 2381953ab5d9e86d87a9ef118f28bc3f67d6d805 Parents: 71f38ac Author: Kevin Yu Authored: Wed Aug 22 10:16:47 2018 -0700 Committer: Bryan Cutler Committed: Wed Aug 22 10:16:47 2018 -0700 -- python/pyspark/sql/functions.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2381953a/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f583373..d58d8d1 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2931,6 +2931,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): blacklist = ['map', 'since', 'ignore_unicode_prefix'] __all__ = [k for k, v in globals().items() if not k.startswith('_') and k[0].islower() and callable(v) and k not in blacklist] +__all__ += ["PandasUDFType"] __all__.sort() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23698][PYTHON] Resolve undefined names in Python 3
Repository: spark Updated Branches: refs/heads/master e75488718 -> 71f38ac24 [SPARK-23698][PYTHON] Resolve undefined names in Python 3 ## What changes were proposed in this pull request? Fix issues arising from the fact that builtins __file__, __long__, __raw_input()__, __unicode__, __xrange()__, etc. were all removed from Python 3. __Undefined names__ have the potential to raise [NameError](https://docs.python.org/3/library/exceptions.html#NameError) at runtime. ## How was this patch tested? * $ __python2 -m flake8 . --count --select=E9,F82 --show-source --statistics__ * $ __python3 -m flake8 . --count --select=E9,F82 --show-source --statistics__ holdenk flake8 testing of https://github.com/apache/spark on Python 3.6.3 $ __python3 -m flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics__ ``` ./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input' result = raw_input("\n%s (y/n): " % prompt) ^ ./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input' primary_author = raw_input( ^ ./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input' pick_ref = raw_input("Enter a branch name [%s]: " % default_branch) ^ ./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input' jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id) ^ ./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input' fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions) ^ ./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input' raw_assignee = raw_input( ^ ./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input' pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): ") ^ ./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input' result = raw_input("Would you like to use the modified title? (y/n): ") ^ ./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input' while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y": ^ ./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input' response = raw_input("%s [y/n]: " % msg) ^ ./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode' author = unidecode.unidecode(unicode(author, "UTF-8")).strip() ^ ./python/setup.py:37:11: F821 undefined name '__version__' VERSION = __version__ ^ ./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer' dispatch[buffer] = save_buffer ^ ./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file' dispatch[file] = save_file ^ ./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode' if not isinstance(obj, str) and not isinstance(obj, unicode): ^ ./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long' intlike = (int, long) ^ ./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long' return self._sc._jvm.Time(long(timestamp * 1000)) ^ ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 undefined name 'xrange' for i in xrange(50): ^ ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 undefined name 'xrange' for j in xrange(5): ^ ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 undefined name 'xrange' for k in xrange(20022): ^ 20F821 undefined name 'raw_input' 20 ``` Closes #20838 from cclauss/fix-undefined-names. Authored-by: cclauss Signed-off-by: Bryan Cutler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71f38ac2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71f38ac2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71f38ac2 Branch: refs/heads/master Commit: 71f38ac242157cbede684546159f2a27892ee09f Parents: e754887 Author: cclauss Authored: Wed Aug 22 10:06:59 2018 -0700 Committer: Bryan Cutler Committed: Wed Aug 22 10:06:59 2018 -0700 -- dev/create-release/releaseutils.py | 8 +++-- dev/merge_spark_pr.py | 2 +- python/pyspark/sql/conf.py | 5 ++- python/pyspark/sql/streaming.py | 5 +-- python/pyspark/streaming/dstream.py | 2 ++ python/pyspark/streaming/tests.py | 34 +++- .../resources/data/scripts/dumpdata_script.py | 3 ++ 7 files changed, 50 insertions(+), 9 deletions(-) --
svn commit: r28891 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_00_02-55f3664-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 22 07:16:44 2018 New Revision: 28891 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_22_00_02-55f3664 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[6/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c7b74f3..946b636 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io._ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.{Files, Paths} -import java.util.{Locale, Optional, Properties} +import java.util.{Locale, Properties} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger @@ -44,11 +44,9 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} -import org.apache.spark.sql.types.StructType abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest { @@ -118,14 +116,16 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf query.nonEmpty, "Cannot add data when there is no query for finding the active kafka source") - val sources = { + val sources: Seq[BaseStreamingSource] = { query.get.logicalPlan.collect { case StreamingExecutionRelation(source: KafkaSource, _) => source - case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source + case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, _) => source } ++ (query.get.lastExecution match { case null => Seq() case e => e.logical.collect { -case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader +case r: StreamingDataSourceV2Relation +if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + r.readSupport.asInstanceOf[KafkaContinuousReadSupport] } }) }.distinct @@ -650,7 +650,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { makeSureGetOffsetCalled, AssertOnQuery { query => query.logicalPlan.collect { - case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true + case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => true }.nonEmpty } ) @@ -675,17 +675,16 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { "kafka.bootstrap.servers" -> testUtils.brokerAddress, "subscribe" -> topic ) ++ Option(minPartitions).map { p => "minPartitions" -> p} -val reader = provider.createMicroBatchReader( - Optional.empty[StructType], dir.getAbsolutePath, new DataSourceOptions(options.asJava)) -reader.setOffsetRange( - Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))), - Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L))) -) -val factories = reader.planInputPartitions().asScala +val readSupport = provider.createMicroBatchReadSupport( + dir.getAbsolutePath, new DataSourceOptions(options.asJava)) +val config = readSupport.newScanConfigBuilder( + KafkaSourceOffset(Map(tp -> 0L)), + KafkaSourceOffset(Map(tp -> 100L))).build() +val inputPartitions = readSupport.planInputPartitions(config) .map(_.asInstanceOf[KafkaMicroBatchInputPartition]) -withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") { - assert(factories.size == numPartitionsGenerated) - factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } +withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") { + assert(inputPartitions.size == numPartitionsGenerated) + inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
[5/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java deleted file mode 100644 index 7b0ba0b..000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; - -/** - * A variation on {@link InputPartitionReader} for use with streaming in continuous processing mode. - */ -@InterfaceStability.Evolving -public interface ContinuousInputPartitionReader extends InputPartitionReader { -/** - * Get the offset of the current record, or the start offset if no records have been read. - * - * The execution engine will call this method along with get() to keep track of the current - * offset. When an epoch ends, the offset of the previous record in each partition will be saved - * as a restart checkpoint. - */ -PartitionOffset getOffset(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java new file mode 100644 index 000..9101c8a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.PartitionReader; + +/** + * A variation on {@link PartitionReader} for use with continuous streaming processing. + */ +@InterfaceStability.Evolving +public interface ContinuousPartitionReader extends PartitionReader { + + /** + * Get the offset of the current record, or the start offset if no records have been read. + * + * The execution engine will call this method along with get() to keep track of the current + * offset. When an epoch ends, the offset of the previous record in each partition will be saved + * as a restart checkpoint. + */ + PartitionOffset getOffset(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java new
[3/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala new file mode 100644 index 000..4218fd5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.sql.{ForeachWriter, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.PythonForeachWriter +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A [[org.apache.spark.sql.sources.v2.DataSourceV2]] for forwarding data into the specified + * [[ForeachWriter]]. + * + * @param writer The [[ForeachWriter]] to process all data. + * @param converter An object to convert internal rows to target type T. Either it can be + * a [[ExpressionEncoder]] or a direct converter function. + * @tparam T The expected type of the sink. + */ +case class ForeachWriteSupportProvider[T]( +writer: ForeachWriter[T], +converter: Either[ExpressionEncoder[T], InternalRow => T]) + extends StreamingWriteSupportProvider { + + override def createStreamingWriteSupport( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamingWriteSupport = { +new StreamingWriteSupport { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createStreamingWriterFactory(): StreamingDataWriterFactory = { +val rowConverter: InternalRow => T = converter match { + case Left(enc) => +val boundEnc = enc.resolveAndBind( + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +boundEnc.fromRow + case Right(func) => +func +} +ForeachWriterFactory(writer, rowConverter) + } + + override def toString: String = "ForeachSink" +} + } +} + +object ForeachWriteSupportProvider { + def apply[T]( + writer: ForeachWriter[T], + encoder: ExpressionEncoder[T]): ForeachWriteSupportProvider[_] = { +writer match { + case pythonWriter: PythonForeachWriter => +new ForeachWriteSupportProvider[UnsafeRow]( + pythonWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow])) + case _ => +new ForeachWriteSupportProvider[T](writer, Left(encoder)) +} + } +} + +case class ForeachWriterFactory[T]( +writer: ForeachWriter[T], +rowConverter: InternalRow => T) + extends StreamingDataWriterFactory { + override def createWriter( + partitionId: Int, + taskId: Long, + epochId: Long): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, rowConverter, partitionId, epochId) + } +} + +/** + * A [[DataWriter]] which writes data in this partition to a [[ForeachWriter]]. + * + * @param writer The [[ForeachWriter]] to process all data. + * @param rowConverter A function which can convert [[InternalRow]] to the required type [[T]] + * @param partitionId + * @param epochId + * @tparam T The type expected by the writer. + */ +class ForeachDataWriter[T]( +writer: ForeachWriter[T], +
[1/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
Repository: spark Updated Branches: refs/heads/master 55f36641f -> e75488718 http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 4980b0c..3d21bc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest { case s: ContinuousExecution => assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") val reader = s.lastExecution.executedPlan.collectFirst { - case DataSourceV2ScanExec(_, _, _, _, r: RateStreamContinuousReader) => r + case DataSourceV2ScanExec(_, _, _, _, r: RateStreamContinuousReadSupport, _) => r }.get val deltaMs = numTriggers * 1000 + 300 http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index 82836dc..3c973d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -27,9 +27,9 @@ import org.apache.spark._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport import org.apache.spark.sql.test.TestSparkSession class EpochCoordinatorSuite @@ -40,20 +40,20 @@ class EpochCoordinatorSuite private var epochCoordinator: RpcEndpointRef = _ - private var writer: StreamWriter = _ + private var writeSupport: StreamingWriteSupport = _ private var query: ContinuousExecution = _ private var orderVerifier: InOrder = _ override def beforeEach(): Unit = { -val reader = mock[ContinuousReader] -writer = mock[StreamWriter] +val reader = mock[ContinuousReadSupport] +writeSupport = mock[StreamingWriteSupport] query = mock[ContinuousExecution] -orderVerifier = inOrder(writer, query) +orderVerifier = inOrder(writeSupport, query) spark = new TestSparkSession() epochCoordinator - = EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, SparkEnv.get) + = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, spark, SparkEnv.get) } test("single epoch") { @@ -209,12 +209,12 @@ class EpochCoordinatorSuite } private def verifyCommit(epoch: Long): Unit = { -orderVerifier.verify(writer).commit(eqTo(epoch), any()) +orderVerifier.verify(writeSupport).commit(eqTo(epoch), any()) orderVerifier.verify(query).commit(epoch) } private def verifyNoCommitFor(epoch: Long): Unit = { -verify(writer, never()).commit(eqTo(epoch), any()) +verify(writeSupport, never()).commit(eqTo(epoch), any()) verify(query, never()).commit(epoch) } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 52b833a..aeef4c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -17,73 +17,74 @@ package org.apache.spark.sql.streaming.sources -import java.util.Optional - import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import
[4/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 5267f5f..e9cc399 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -21,6 +21,7 @@ import java.util.regex.Pattern import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport} private[sql] object DataSourceV2Utils extends Logging { @@ -55,4 +56,12 @@ private[sql] object DataSourceV2Utils extends Logging { case _ => Map.empty } + + def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = { +val name = ds match { + case register: DataSourceRegister => register.shortName() + case _ => ds.getClass.getName +} +throw new UnsupportedOperationException(name + " source does not support user-specified schema") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 59ebb9b..c3f7b69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -23,15 +23,11 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.streaming.MicroBatchExecution import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils /** @@ -39,7 +35,8 @@ import org.apache.spark.util.Utils * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. */ @deprecated("Use specific logical plans like AppendData instead", "2.4.0") -case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) extends LogicalPlan { +case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPlan) + extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil } @@ -47,46 +44,48 @@ case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) ext /** * The physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan { +case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) + extends SparkPlan { + override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { -val writeTask = writer.createWriterFactory() -val useCommitCoordinator = writer.useCommitCoordinator +val writerFactory = writeSupport.createBatchWriterFactory() +val useCommitCoordinator = writeSupport.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) -logInfo(s"Start processing data source writer: $writer. " + +logInfo(s"Start processing data source write support: $writeSupport. " + s"The input RDD has ${messages.length} partitions.") try { sparkContext.runJob( rdd, (context: TaskContext, iter: Iterator[InternalRow]) => - DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator), + DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator), rdd.partitions.indices, (index, message: WriterCommitMessage) => { messages(index) = message - writer.onDataWriterCommit(message) +
[2/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java index 274dc37..2cdbba8 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -17,72 +17,26 @@ package test.org.apache.spark.sql.sources.v2; -import java.io.IOException; -import java.util.List; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.sources.v2.BatchReadSupportProvider; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.types.StructType; - -public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport { - - class Reader implements DataSourceReader { -private final StructType schema = new StructType().add("i", "int").add("j", "int"); - -@Override -public StructType readSchema() { - return schema; -} - -@Override -public List> planInputPartitions() { - return java.util.Arrays.asList( -new JavaSimpleInputPartition(0, 5), -new JavaSimpleInputPartition(5, 10)); -} - } - - static class JavaSimpleInputPartition implements InputPartition, -InputPartitionReader { +import org.apache.spark.sql.sources.v2.reader.*; -private int start; -private int end; +public class JavaSimpleDataSourceV2 implements DataSourceV2, BatchReadSupportProvider { -JavaSimpleInputPartition(int start, int end) { - this.start = start; - this.end = end; -} - -@Override -public InputPartitionReader createPartitionReader() { - return new JavaSimpleInputPartition(start - 1, end); -} + class ReadSupport extends JavaSimpleReadSupport { @Override -public boolean next() { - start += 1; - return start < end; -} - -@Override -public InternalRow get() { - return new GenericInternalRow(new Object[] {start, -start}); -} - -@Override -public void close() throws IOException { - +public InputPartition[] planInputPartitions(ScanConfig config) { + InputPartition[] partitions = new InputPartition[2]; + partitions[0] = new JavaRangeInputPartition(0, 5); + partitions[1] = new JavaRangeInputPartition(5, 10); + return partitions; } } @Override - public DataSourceReader createReader(DataSourceOptions options) { -return new Reader(); + public BatchReadSupport createBatchReadSupport(DataSourceOptions options) { +return new ReadSupport(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java new file mode 100644 index 000..685f9b9 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.IOException; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.types.StructType; + +abstract class JavaSimpleReadSupport implements
[7/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API
[SPARK-24882][SQL] improve data source v2 API ## What changes were proposed in this pull request? Improve the data source v2 API according to the [design doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing) summary of the changes 1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> `InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> `InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar renaming also happens at streaming and write APIs. 2. create `ScanConfig` to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming `ReadSupport`(previouslly named `DataSourceReader`) immutable. All other methods take `ScanConfig` as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.). 3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`. 4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`) Some follow-ups we should do after this PR (tracked by https://issues.apache.org/jira/browse/SPARK-25186 ): 1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same as the previous `DataSourceReader`, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` take `DataSourceOptions` as parameter, if we decide to change the life cycle. 2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API more flexible. But it's only needed when we add the `replaceWhere` support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR. 3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas. 4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the `ScanConfig`. Better to be done in another PR. 5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the `SupportsPushdownXYZ` traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc. 6. Improve the continuous streaming engine to only create a new `ScanConfig` when re-configuring. 7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have for file source, we can change the hive partition pruning to use the public `Filter`. ## How was this patch tested? existing tests. Closes #22009 from cloud-fan/redesign. Authored-by: Wenchen Fan Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7548871 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7548871 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7548871 Branch: refs/heads/master Commit: e754887182304ad0d622754e33192ebcdd515965 Parents: 55f3664 Author: Wenchen Fan Authored: Wed Aug 22 00:10:55 2018 -0700 Committer: Xiao Li Committed: Wed Aug 22 00:10:55 2018 -0700 -- .../kafka010/KafkaContinuousReadSupport.scala | 255 +++ .../sql/kafka010/KafkaContinuousReader.scala| 248 --- .../kafka010/KafkaMicroBatchReadSupport.scala | 401 + .../sql/kafka010/KafkaMicroBatchReader.scala| 402 - .../sql/kafka010/KafkaSourceProvider.scala | 37 +- .../spark/sql/kafka010/KafkaStreamWriter.scala | 118 - .../kafka010/KafkaStreamingWriteSupport.scala | 118 + .../kafka010/KafkaContinuousSourceSuite.scala | 8 +- .../sql/kafka010/KafkaContinuousTest.scala | 8 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 33 +- .../sources/v2/BatchReadSupportProvider.java| 61 +++ .../sources/v2/BatchWriteSupportProvider.java | 59 +++ .../sql/sources/v2/ContinuousReadSupport.java | 46 -- .../v2/ContinuousReadSupportProvider.java | 70 +++ .../spark/sql/sources/v2/DataSourceV2.java | 10 +- .../sql/sources/v2/MicroBatchReadSupport.java | 52 --- .../v2/MicroBatchReadSupportProvider.java | 70 +++
spark git commit: [SPARK-25093][SQL] Avoid recompiling regexp for comments multiple times
Repository: spark Updated Branches: refs/heads/master 4a9c9d8f9 -> 55f36641f [SPARK-25093][SQL] Avoid recompiling regexp for comments multiple times ## What changes were proposed in this pull request? The PR moves the compilation of the regexp for code formatting outside the method which is called for each code block when splitting expressions, in order to avoid recompiling the regexp every time. Credit should be given to Izek Greenfield. ## How was this patch tested? existing UTs Closes #22135 from mgaido91/SPARK-25093. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55f36641 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55f36641 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55f36641 Branch: refs/heads/master Commit: 55f36641ff20114b892795f100da7efb79b0cc32 Parents: 4a9c9d8 Author: Marco Gaido Authored: Wed Aug 22 14:31:51 2018 +0800 Committer: Wenchen Fan Committed: Wed Aug 22 14:31:51 2018 +0800 -- .../scala/org/apache/spark/deploy/worker/Worker.scala| 4 ++-- core/src/main/scala/org/apache/spark/util/Utils.scala| 11 ++- .../apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala | 6 +++--- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 3 ++- .../sql/catalyst/expressions/codegen/CodeFormatter.scala | 10 +- .../main/scala/org/apache/spark/sql/types/DataType.scala | 3 ++- .../org/apache/spark/streaming/dstream/DStream.scala | 10 +- 7 files changed, 25 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55f36641/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ee1ca0b..cbd812a 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -758,6 +758,7 @@ private[deploy] class Worker( private[deploy] object Worker extends Logging { val SYSTEM_NAME = "sparkWorker" val ENDPOINT_NAME = "Worker" + private val SSL_NODE_LOCAL_CONFIG_PATTERN = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r def main(argStrings: Array[String]) { Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( @@ -803,9 +804,8 @@ private[deploy] object Worker extends Logging { } def isUseLocalNodeSSLConfig(cmd: Command): Boolean = { -val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r val result = cmd.javaOpts.collectFirst { - case pattern(_result) => _result.toBoolean + case SSL_NODE_LOCAL_CONFIG_PATTERN(_result) => _result.toBoolean } result.getOrElse(false) } http://git-wip-us.apache.org/repos/asf/spark/blob/55f36641/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7ec707d..e6646bd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1409,13 +1409,14 @@ private[spark] object Utils extends Logging { } } + // A regular expression to match classes of the internal Spark API's + // that we want to skip when finding the call site of a method. + private val SPARK_CORE_CLASS_REGEX = + """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r + private val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r + /** Default filtering function for finding call sites using `getCallSite`. */ private def sparkInternalExclusionFunction(className: String): Boolean = { -// A regular expression to match classes of the internal Spark API's -// that we want to skip when finding the call site of a method. -val SPARK_CORE_CLASS_REGEX = - """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r -val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r val SCALA_CORE_CLASS_PREFIX = "scala" val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined || SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined http://git-wip-us.apache.org/repos/asf/spark/blob/55f36641/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala