spark git commit: [SPARK-21549][CORE] Respect OutputFormats with no output directory provided
Repository: spark Updated Branches: refs/heads/branch-2.2 8a4e7dd89 -> 0d3f1667e [SPARK-21549][CORE] Respect OutputFormats with no output directory provided ## What changes were proposed in this pull request? Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue. Since version 2.2 Spark does not respect OutputFormat with no output paths provided. The examples of such formats are [Cassandra OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java), [Aerospike OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java), etc. which do not have an ability to rollback the results written to an external systems on job failure. Provided output directory is required by Spark to allows files to be committed to an absolute output location, that is not the case for output formats which write data to external systems. This pull request prevents accessing `absPathStagingDir` method that causes the error described in SPARK-21549 unless there are files to rename in `addedAbsPathFiles`. ## How was this patch tested? Unit tests Author: Sergey ZhemzhitskyCloses #19294 from szhem/SPARK-21549-abs-output-commits. (cherry picked from commit 2030f19511f656e9534f3fd692e622e45f9a074e) Signed-off-by: Mridul Muralidharan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3f1667 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3f1667 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3f1667 Branch: refs/heads/branch-2.2 Commit: 0d3f1667e92debbe2c3d412ea8e9c6eba752ef53 Parents: 8a4e7dd Author: Sergey Zhemzhitsky Authored: Fri Oct 6 20:43:53 2017 -0700 Committer: Mridul Muralidharan Committed: Fri Oct 6 20:44:47 2017 -0700 -- .../io/HadoopMapReduceCommitProtocol.scala | 28 + .../spark/rdd/PairRDDFunctionsSuite.scala | 33 +++- 2 files changed, 54 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d3f1667/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 22e2679..4d42a66 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -35,6 +35,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * (from the newer mapreduce API, not the old mapred API). * * Unlike Hadoop's OutputCommitter, this implementation is serializable. + * + * @param jobId the job's or stage's id + * @param path the job's output path, or null if committer acts as a noop */ class HadoopMapReduceCommitProtocol(jobId: String, path: String) extends FileCommitProtocol with Serializable with Logging { @@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + /** + * Checks whether there are files to be committed to an absolute output location. + * + * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, + * it is necessary to check whether the output path is specified. Output path may not be required + * for committers not writing to distributed file systems. + */ + private def hasAbsPathFiles: Boolean = path != null + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -129,17 +141,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (hasAbsPathFiles) { + val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) + for ((src, dst) <- filesToMove) { +fs.rename(new Path(src), new Path(dst)) + } + fs.delete(absPathStagingDir, true)
spark git commit: [SPARK-21549][CORE] Respect OutputFormats with no output directory provided
Repository: spark Updated Branches: refs/heads/master debcbec74 -> 2030f1951 [SPARK-21549][CORE] Respect OutputFormats with no output directory provided ## What changes were proposed in this pull request? Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue. Since version 2.2 Spark does not respect OutputFormat with no output paths provided. The examples of such formats are [Cassandra OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java), [Aerospike OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java), etc. which do not have an ability to rollback the results written to an external systems on job failure. Provided output directory is required by Spark to allows files to be committed to an absolute output location, that is not the case for output formats which write data to external systems. This pull request prevents accessing `absPathStagingDir` method that causes the error described in SPARK-21549 unless there are files to rename in `addedAbsPathFiles`. ## How was this patch tested? Unit tests Author: Sergey ZhemzhitskyCloses #19294 from szhem/SPARK-21549-abs-output-commits. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2030f195 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2030f195 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2030f195 Branch: refs/heads/master Commit: 2030f19511f656e9534f3fd692e622e45f9a074e Parents: debcbec Author: Sergey Zhemzhitsky Authored: Fri Oct 6 20:43:53 2017 -0700 Committer: Mridul Muralidharan Committed: Fri Oct 6 20:43:53 2017 -0700 -- .../io/HadoopMapReduceCommitProtocol.scala | 28 + .../spark/rdd/PairRDDFunctionsSuite.scala | 33 +++- 2 files changed, 54 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2030f195/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index b1d07ab..a7e6859 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -35,6 +35,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * (from the newer mapreduce API, not the old mapred API). * * Unlike Hadoop's OutputCommitter, this implementation is serializable. + * + * @param jobId the job's or stage's id + * @param path the job's output path, or null if committer acts as a noop */ class HadoopMapReduceCommitProtocol(jobId: String, path: String) extends FileCommitProtocol with Serializable with Logging { @@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + /** + * Checks whether there are files to be committed to an absolute output location. + * + * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, + * it is necessary to check whether the output path is specified. Output path may not be required + * for committers not writing to distributed file systems. + */ + private def hasAbsPathFiles: Boolean = path != null + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -130,17 +142,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (hasAbsPathFiles) { + val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) + for ((src, dst) <- filesToMove) { +fs.rename(new Path(src), new Path(dst)) + } + fs.delete(absPathStagingDir, true) } -fs.delete(absPathStagingDir, true) } override def abortJob(jobContext: JobContext): Unit = {
spark git commit: [SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query
Repository: spark Updated Branches: refs/heads/master 08b204fd2 -> debcbec74 [SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query ## What changes were proposed in this pull request? `monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it. ## How was this patch tested? Added test. Author: Liang-Chi HsiehCloses #19336 from viirya/SPARK-21947. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/debcbec7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/debcbec7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/debcbec7 Branch: refs/heads/master Commit: debcbec7491d3a23b19ef149e50d2887590b6de0 Parents: 08b204f Author: Liang-Chi Hsieh Authored: Fri Oct 6 13:10:04 2017 -0700 Committer: Shixiong Zhu Committed: Fri Oct 6 13:10:04 2017 -0700 -- .../analysis/UnsupportedOperationChecker.scala | 15 ++- .../analysis/UnsupportedOperationsSuite.scala| 10 +- 2 files changed, 23 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index dee6fbe..04502d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans._ @@ -129,6 +129,16 @@ object UnsupportedOperationChecker { !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) } +def checkUnsupportedExpressions(implicit operator: LogicalPlan): Unit = { + val unsupportedExprs = operator.expressions.flatMap(_.collect { +case m: MonotonicallyIncreasingID => m + }).distinct + if (unsupportedExprs.nonEmpty) { +throwError("Expression(s): " + unsupportedExprs.map(_.sql).mkString(", ") + + " is not supported with streaming DataFrames/Datasets") + } +} + plan.foreachUp { implicit subPlan => // Operations that cannot exists anywhere in a streaming plan @@ -323,6 +333,9 @@ object UnsupportedOperationChecker { case _ => } + + // Check if there are unsupported expressions in streaming query plan. + checkUnsupportedExpressions(subPlan) } } http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index e5057c4..60d1351 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, MonotonicallyIncreasingID, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _} @@ -614,6 +614,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite { testOutputMode(Update, shouldSupportAggregation = true,
spark git commit: [SPARK-22214][SQL] Refactor the list hive partitions code
Repository: spark Updated Branches: refs/heads/master c7b46d4d8 -> 08b204fd2 [SPARK-22214][SQL] Refactor the list hive partitions code ## What changes were proposed in this pull request? In this PR we make a few changes to the list hive partitions code, to make the code more extensible. The following changes are made: 1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of `shim.getAllPartitions` when `spec` is empty; 2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` if the config `metastorePartitionPruning` is enabled, but actually, we'd better call `listPartitions` if `partitionPruningPred` is empty; 3. We should use sessionCatalog instead of SharedState.externalCatalog in `HiveTableScanExec`. ## How was this patch tested? Tested by existing test cases since this is code refactor, no regression or behavior change is expected. Author: Xingbo JiangCloses #19444 from jiangxb1987/hivePartitions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08b204fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08b204fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08b204fd Branch: refs/heads/master Commit: 08b204fd2c731e87d3bc2cc0bccb6339ef7e3a6e Parents: c7b46d4 Author: Xingbo Jiang Authored: Fri Oct 6 12:53:35 2017 -0700 Committer: gatorsmile Committed: Fri Oct 6 12:53:35 2017 -0700 -- .../spark/sql/catalyst/catalog/interface.scala | 5 .../spark/sql/hive/client/HiveClientImpl.scala | 7 ++--- .../sql/hive/execution/HiveTableScanExec.scala | 28 +--- 3 files changed, 22 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index fe2af91..975b084 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -405,6 +405,11 @@ object CatalogTypes { * Specifications of a table partition. Mapping column name to column value. */ type TablePartitionSpec = Map[String, String] + + /** + * Initialize an empty spec. + */ + lazy val emptyTablePartitionSpec: TablePartitionSpec = Map.empty[String, String] } /** http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 66165c7..a01c312 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -638,12 +638,13 @@ private[hive] class HiveClientImpl( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) -val parts = spec match { - case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) +val partSpec = spec match { + case None => CatalogTypes.emptyTablePartitionSpec case Some(s) => assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") -client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) +s } +val parts = client.getPartitions(hiveTable, partSpec.asJava).asScala.map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 48d0b4a..4f8dab9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -162,21 +162,19 @@ case class HiveTableScanExec( // exposed for tests @transient lazy val rawPartitions = { -val
spark git commit: [SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows command scripts
Repository: spark Updated Branches: refs/heads/master 0c03297bf -> c7b46d4d8 [SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows command scripts ## What changes were proposed in this pull request? All the windows command scripts can not handle quotes in parameter. Run a windows command shell with parameter which has quotes can reproduce the bug: ``` C:\Users\meng\software\spark-2.2.0-bin-hadoop2.7> bin\spark-shell --driver-java-options " -Dfile.encoding=utf-8 " 'C:\Users\meng\software\spark-2.2.0-bin-hadoop2.7\bin\spark-shell2.cmd" --driver-java-options "' is not recognized as an internal or external command, operable program or batch file. ``` Windows recognize "--driver-java-options" as part of the command. All the Windows command script has the following code have the bug. ``` cmd /V /E /C "" %* ``` We should quote command and parameters like ``` cmd /V /E /C """ %*" ``` ## How was this patch tested? Test manually on Windows 10 and Windows 7 We can verify it by the following demo: ``` C:\Users\meng\program\demo>cat a.cmd echo off cmd /V /E /C "b.cmd" %* C:\Users\meng\program\demo>cat b.cmd echo off echo %* C:\Users\meng\program\demo>cat c.cmd echo off cmd /V /E /C ""b.cmd" %*" C:\Users\meng\program\demo>a.cmd "123" 'b.cmd" "123' is not recognized as an internal or external command, operable program or batch file. C:\Users\meng\program\demo>c.cmd "123" "123" ``` With the spark-shell.cmd example, change it to the following code will make the command execute succeed. ``` cmd /V /E /C ""%~dp0spark-shell2.cmd" %*" ``` ``` C:\Users\meng\software\spark-2.2.0-bin-hadoop2.7> bin\spark-shell --driver-java-options " -Dfile.encoding=utf-8 " Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ... ``` Author: minixalphaCloses #19090 from minixalpha/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7b46d4d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7b46d4d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7b46d4d Branch: refs/heads/master Commit: c7b46d4d8aa8da24131d79d2bfa36e8db19662e4 Parents: 0c03297 Author: minixalpha Authored: Fri Oct 6 23:38:47 2017 +0900 Committer: hyukjinkwon Committed: Fri Oct 6 23:38:47 2017 +0900 -- bin/beeline.cmd | 4 +++- bin/pyspark.cmd | 4 +++- bin/run-example.cmd | 5 - bin/spark-class.cmd | 4 +++- bin/spark-shell.cmd | 4 +++- bin/spark-submit.cmd | 4 +++- bin/sparkR.cmd | 4 +++- 7 files changed, 22 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7b46d4d/bin/beeline.cmd -- diff --git a/bin/beeline.cmd b/bin/beeline.cmd index 02464bd..288059a 100644 --- a/bin/beeline.cmd +++ b/bin/beeline.cmd @@ -17,4 +17,6 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem -cmd /V /E /C "%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %* +rem The outermost quotes are used to prevent Windows command line parse error +rem when there are some quotes in parameters, see SPARK-21877. +cmd /V /E /C ""%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*" http://git-wip-us.apache.org/repos/asf/spark/blob/c7b46d4d/bin/pyspark.cmd -- diff --git a/bin/pyspark.cmd b/bin/pyspark.cmd index 72d046a..3dcf1d4 100644 --- a/bin/pyspark.cmd +++ b/bin/pyspark.cmd @@ -20,4 +20,6 @@ rem rem This is the entry point for running PySpark. To avoid polluting the rem environment, it just launches a new cmd to do the real work. -cmd /V /E /C "%~dp0pyspark2.cmd" %* +rem The outermost quotes are used to prevent Windows command line parse error +rem when there are some quotes in parameters, see SPARK-21877. +cmd /V /E /C ""%~dp0pyspark2.cmd" %*" http://git-wip-us.apache.org/repos/asf/spark/blob/c7b46d4d/bin/run-example.cmd -- diff --git a/bin/run-example.cmd b/bin/run-example.cmd index f9b786e..efa5f81 100644 --- a/bin/run-example.cmd +++ b/bin/run-example.cmd @@ -19,4 +19,7 @@ rem set SPARK_HOME=%~dp0.. set _SPARK_CMD_USAGE=Usage: ./bin/run-example [options] example-class [example args] -cmd /V /E /C "%~dp0spark-submit.cmd" run-example %* + +rem The outermost quotes are used to prevent Windows command line parse error +rem when there are some quotes in parameters, see SPARK-21877. +cmd /V /E /C ""%~dp0spark-submit.cmd" run-example %*"
spark git commit: [SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2
Repository: spark Updated Branches: refs/heads/master 83488cc31 -> 0c03297bf [SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2 ## What changes were proposed in this pull request? Move flume behind a profile, take 2. See https://github.com/apache/spark/pull/19365 for most of the back-story. This change should fix the problem by removing the examples module dependency and moving Flume examples to the module itself. It also adds deprecation messages, per a discussion on dev about deprecating for 2.3.0. ## How was this patch tested? Existing tests, which still enable flume integration. Author: Sean OwenCloses #19412 from srowen/SPARK-22142.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c03297b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c03297b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c03297b Branch: refs/heads/master Commit: 0c03297bf0e87944f9fe0535fdae5518228e3e29 Parents: 83488cc Author: Sean Owen Authored: Fri Oct 6 15:08:28 2017 +0100 Committer: Sean Owen Committed: Fri Oct 6 15:08:28 2017 +0100 -- dev/create-release/release-build.sh | 4 +- dev/mima| 2 +- dev/scalastyle | 1 + dev/sparktestsupport/modules.py | 20 +- dev/test-dependencies.sh| 2 +- docs/building-spark.md | 7 ++ docs/streaming-flume-integration.md | 13 ++-- examples/pom.xml| 7 -- .../examples/streaming/JavaFlumeEventCount.java | 69 --- .../examples/streaming/FlumeEventCount.scala| 70 .../streaming/FlumePollingEventCount.scala | 67 --- .../spark/examples/JavaFlumeEventCount.java | 67 +++ .../apache/spark/examples/FlumeEventCount.scala | 68 +++ .../spark/examples/FlumePollingEventCount.scala | 65 ++ .../spark/streaming/flume/FlumeUtils.scala | 1 + pom.xml | 13 +++- project/SparkBuild.scala| 17 ++--- python/pyspark/streaming/flume.py | 4 ++ python/pyspark/streaming/tests.py | 16 - 19 files changed, 273 insertions(+), 240 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 5390f59..7e8d5c7 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -84,9 +84,9 @@ MVN="build/mvn --force" # Hive-specific profiles for some builds HIVE_PROFILES="-Phive -Phive-thriftserver" # Profiles for publishing snapshots and release to Maven Central -PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" +PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" # Profiles for building binary releases -BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr" +BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr" # Scala 2.11 only profiles for some builds SCALA_2_11_PROFILES="-Pkafka-0-8" # Scala 2.12 only profiles for some builds http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/mima -- diff --git a/dev/mima b/dev/mima index fdb21f5..1e3ca97 100755 --- a/dev/mima +++ b/dev/mima @@ -24,7 +24,7 @@ set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" -SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" +SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)" OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/scalastyle -- diff --git a/dev/scalastyle b/dev/scalastyle index e5aa589..89ecc8a 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \ -Pmesos \ -Pkafka-0-8 \ -Pyarn \ +-Pflume \ -Phive \ -Phive-thriftserver \ scalastyle test:scalastyle \ http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/sparktestsupport/modules.py
spark git commit: [SPARK-21871][SQL] Fix infinite loop when bytecode size is larger than spark.sql.codegen.hugeMethodLimit
Repository: spark Updated Branches: refs/heads/master ae61f187a -> 83488cc31 [SPARK-21871][SQL] Fix infinite loop when bytecode size is larger than spark.sql.codegen.hugeMethodLimit ## What changes were proposed in this pull request? When exceeding `spark.sql.codegen.hugeMethodLimit`, the runtime fallbacks to the Volcano iterator solution. This could cause an infinite loop when `FileSourceScanExec` can use the columnar batch to read the data. This PR is to fix the issue. ## How was this patch tested? Added a test Author: gatorsmileCloses #19440 from gatorsmile/testt. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83488cc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83488cc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83488cc3 Branch: refs/heads/master Commit: 83488cc3180ca18f829516f550766efb3095881e Parents: ae61f18 Author: gatorsmile Authored: Thu Oct 5 23:33:49 2017 -0700 Committer: gatorsmile Committed: Thu Oct 5 23:33:49 2017 -0700 -- .../sql/execution/WholeStageCodegenExec.scala | 12 ++ .../sql/execution/WholeStageCodegenSuite.scala | 23 ++-- 2 files changed, 29 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83488cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 9073d59..1aaaf89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co // Check if compiled code has a too large function if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { - logWarning(s"Found too long generated codes and JIT optimization might not work: " + -s"the bytecode size was $maxCodeSize, this value went over the limit " + + logInfo(s"Found too long generated codes and JIT optimization might not work: " + +s"the bytecode size ($maxCodeSize) is above the limit " + s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + s"for this plan. To avoid this, you can raise the limit " + -s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString") - return child.execute() +s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") + child match { +// The fallback solution of batch file source scan still uses WholeStageCodegenExec +case f: FileSourceScanExec if f.supportsBatch => // do nothing +case _ => return child.execute() + } } val references = ctx.references.toArray http://git-wip-us.apache.org/repos/asf/spark/blob/83488cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index aaa77b3..098e4cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.Row +import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} -class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { +class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { test("range/filter should be combined") { val df = spark.range(10).filter("id = 1").selectExpr("id + 1") @@ -185,4 +185,23 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } + + test("bytecode of batch file scan