spark git commit: [SPARK-21549][CORE] Respect OutputFormats with no output directory provided

2017-10-06 Thread mridulm80
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 8a4e7dd89 -> 0d3f1667e


[SPARK-21549][CORE] Respect OutputFormats with no output directory provided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths 
provided.
The examples of such formats are [Cassandra 
OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java),
 [Aerospike 
OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java),
 etc. which do not have an ability to rollback the results written to an 
external systems on job failure.

Provided output directory is required by Spark to allows files to be committed 
to an absolute output location, that is not the case for output formats which 
write data to external systems.

This pull request prevents accessing `absPathStagingDir` method that causes the 
error described in SPARK-21549 unless there are files to rename in 
`addedAbsPathFiles`.

## How was this patch tested?

Unit tests

Author: Sergey Zhemzhitsky 

Closes #19294 from szhem/SPARK-21549-abs-output-commits.

(cherry picked from commit 2030f19511f656e9534f3fd692e622e45f9a074e)
Signed-off-by: Mridul Muralidharan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3f1667
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3f1667
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3f1667

Branch: refs/heads/branch-2.2
Commit: 0d3f1667e92debbe2c3d412ea8e9c6eba752ef53
Parents: 8a4e7dd
Author: Sergey Zhemzhitsky 
Authored: Fri Oct 6 20:43:53 2017 -0700
Committer: Mridul Muralidharan 
Committed: Fri Oct 6 20:44:47 2017 -0700

--
 .../io/HadoopMapReduceCommitProtocol.scala  | 28 +
 .../spark/rdd/PairRDDFunctionsSuite.scala   | 33 +++-
 2 files changed, 54 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0d3f1667/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 22e2679..4d42a66 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -35,6 +35,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
  * (from the newer mapreduce API, not the old mapred API).
  *
  * Unlike Hadoop's OutputCommitter, this implementation is serializable.
+ *
+ * @param jobId the job's or stage's id
+ * @param path the job's output path, or null if committer acts as a noop
  */
 class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   extends FileCommitProtocol with Serializable with Logging {
@@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   *
+   * As committing and aborting a job occurs on driver, where 
`addedAbsPathFiles` is always null,
+   * it is necessary to check whether the output path is specified. Output 
path may not be required
+   * for committers not writing to distributed file systems.
+   */
+  private def hasAbsPathFiles: Boolean = path != null
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
 val format = context.getOutputFormatClass.newInstance()
 // If OutputFormat is Configurable, we should set conf to it.
@@ -129,17 +141,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
 val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
   .foldLeft(Map[String, String]())(_ ++ _)
 logDebug(s"Committing files staged for absolute locations $filesToMove")
-val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-for ((src, dst) <- filesToMove) {
-  fs.rename(new Path(src), new Path(dst))
+if (hasAbsPathFiles) {
+  val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+  for ((src, dst) <- filesToMove) {
+fs.rename(new Path(src), new Path(dst))
+  }
+  fs.delete(absPathStagingDir, true)
   

spark git commit: [SPARK-21549][CORE] Respect OutputFormats with no output directory provided

2017-10-06 Thread mridulm80
Repository: spark
Updated Branches:
  refs/heads/master debcbec74 -> 2030f1951


[SPARK-21549][CORE] Respect OutputFormats with no output directory provided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths 
provided.
The examples of such formats are [Cassandra 
OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java),
 [Aerospike 
OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java),
 etc. which do not have an ability to rollback the results written to an 
external systems on job failure.

Provided output directory is required by Spark to allows files to be committed 
to an absolute output location, that is not the case for output formats which 
write data to external systems.

This pull request prevents accessing `absPathStagingDir` method that causes the 
error described in SPARK-21549 unless there are files to rename in 
`addedAbsPathFiles`.

## How was this patch tested?

Unit tests

Author: Sergey Zhemzhitsky 

Closes #19294 from szhem/SPARK-21549-abs-output-commits.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2030f195
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2030f195
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2030f195

Branch: refs/heads/master
Commit: 2030f19511f656e9534f3fd692e622e45f9a074e
Parents: debcbec
Author: Sergey Zhemzhitsky 
Authored: Fri Oct 6 20:43:53 2017 -0700
Committer: Mridul Muralidharan 
Committed: Fri Oct 6 20:43:53 2017 -0700

--
 .../io/HadoopMapReduceCommitProtocol.scala  | 28 +
 .../spark/rdd/PairRDDFunctionsSuite.scala   | 33 +++-
 2 files changed, 54 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2030f195/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index b1d07ab..a7e6859 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -35,6 +35,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
  * (from the newer mapreduce API, not the old mapred API).
  *
  * Unlike Hadoop's OutputCommitter, this implementation is serializable.
+ *
+ * @param jobId the job's or stage's id
+ * @param path the job's output path, or null if committer acts as a noop
  */
 class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   extends FileCommitProtocol with Serializable with Logging {
@@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   *
+   * As committing and aborting a job occurs on driver, where 
`addedAbsPathFiles` is always null,
+   * it is necessary to check whether the output path is specified. Output 
path may not be required
+   * for committers not writing to distributed file systems.
+   */
+  private def hasAbsPathFiles: Boolean = path != null
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
 val format = context.getOutputFormatClass.newInstance()
 // If OutputFormat is Configurable, we should set conf to it.
@@ -130,17 +142,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
 val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
   .foldLeft(Map[String, String]())(_ ++ _)
 logDebug(s"Committing files staged for absolute locations $filesToMove")
-val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-for ((src, dst) <- filesToMove) {
-  fs.rename(new Path(src), new Path(dst))
+if (hasAbsPathFiles) {
+  val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+  for ((src, dst) <- filesToMove) {
+fs.rename(new Path(src), new Path(dst))
+  }
+  fs.delete(absPathStagingDir, true)
 }
-fs.delete(absPathStagingDir, true)
   }
 
   override def abortJob(jobContext: JobContext): Unit = {
 

spark git commit: [SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query

2017-10-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 08b204fd2 -> debcbec74


[SPARK-21947][SS] Check and report error when monotonically_increasing_id is 
used in streaming query

## What changes were proposed in this pull request?

`monotonically_increasing_id` doesn't work in Structured Streaming. We should 
throw an exception if a streaming query uses it.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #19336 from viirya/SPARK-21947.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/debcbec7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/debcbec7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/debcbec7

Branch: refs/heads/master
Commit: debcbec7491d3a23b19ef149e50d2887590b6de0
Parents: 08b204f
Author: Liang-Chi Hsieh 
Authored: Fri Oct 6 13:10:04 2017 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 6 13:10:04 2017 -0700

--
 .../analysis/UnsupportedOperationChecker.scala   | 15 ++-
 .../analysis/UnsupportedOperationsSuite.scala| 10 +-
 2 files changed, 23 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index dee6fbe..04502d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, MonotonicallyIncreasingID}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
 import org.apache.spark.sql.catalyst.plans._
@@ -129,6 +129,16 @@ object UnsupportedOperationChecker {
   !subplan.isStreaming || (aggs.nonEmpty && outputMode == 
InternalOutputModes.Complete)
 }
 
+def checkUnsupportedExpressions(implicit operator: LogicalPlan): Unit = {
+  val unsupportedExprs = operator.expressions.flatMap(_.collect {
+case m: MonotonicallyIncreasingID => m
+  }).distinct
+  if (unsupportedExprs.nonEmpty) {
+throwError("Expression(s): " + unsupportedExprs.map(_.sql).mkString(", 
") +
+  " is not supported with streaming DataFrames/Datasets")
+  }
+}
+
 plan.foreachUp { implicit subPlan =>
 
   // Operations that cannot exists anywhere in a streaming plan
@@ -323,6 +333,9 @@ object UnsupportedOperationChecker {
 
 case _ =>
   }
+
+  // Check if there are unsupported expressions in streaming query plan.
+  checkUnsupportedExpressions(subPlan)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index e5057c4..60d1351 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, MonotonicallyIncreasingID, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.Count
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
@@ -614,6 +614,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
   testOutputMode(Update, shouldSupportAggregation = true, 

spark git commit: [SPARK-22214][SQL] Refactor the list hive partitions code

2017-10-06 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master c7b46d4d8 -> 08b204fd2


[SPARK-22214][SQL] Refactor the list hive partitions code

## What changes were proposed in this pull request?

In this PR we make a few changes to the list hive partitions code, to make the 
code more extensible.
The following changes are made:
1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of 
`shim.getAllPartitions` when `spec` is empty;
2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` 
if the config `metastorePartitionPruning` is enabled, but actually, we'd better 
call `listPartitions` if `partitionPruningPred` is empty;
3.  We should use sessionCatalog instead of SharedState.externalCatalog in 
`HiveTableScanExec`.

## How was this patch tested?

Tested by existing test cases since this is code refactor, no regression or 
behavior change is expected.

Author: Xingbo Jiang 

Closes #19444 from jiangxb1987/hivePartitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08b204fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08b204fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08b204fd

Branch: refs/heads/master
Commit: 08b204fd2c731e87d3bc2cc0bccb6339ef7e3a6e
Parents: c7b46d4
Author: Xingbo Jiang 
Authored: Fri Oct 6 12:53:35 2017 -0700
Committer: gatorsmile 
Committed: Fri Oct 6 12:53:35 2017 -0700

--
 .../spark/sql/catalyst/catalog/interface.scala  |  5 
 .../spark/sql/hive/client/HiveClientImpl.scala  |  7 ++---
 .../sql/hive/execution/HiveTableScanExec.scala  | 28 +---
 3 files changed, 22 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index fe2af91..975b084 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -405,6 +405,11 @@ object CatalogTypes {
* Specifications of a table partition. Mapping column name to column value.
*/
   type TablePartitionSpec = Map[String, String]
+
+  /**
+   * Initialize an empty spec.
+   */
+  lazy val emptyTablePartitionSpec: TablePartitionSpec = Map.empty[String, 
String]
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 66165c7..a01c312 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -638,12 +638,13 @@ private[hive] class HiveClientImpl(
   table: CatalogTable,
   spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = 
withHiveState {
 val hiveTable = toHiveTable(table, Some(userName))
-val parts = spec match {
-  case None => shim.getAllPartitions(client, 
hiveTable).map(fromHivePartition)
+val partSpec = spec match {
+  case None => CatalogTypes.emptyTablePartitionSpec
   case Some(s) =>
 assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
-client.getPartitions(hiveTable, 
s.asJava).asScala.map(fromHivePartition)
+s
 }
+val parts = client.getPartitions(hiveTable, 
partSpec.asJava).asScala.map(fromHivePartition)
 HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
 parts
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 48d0b4a..4f8dab9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -162,21 +162,19 @@ case class HiveTableScanExec(
 
   // exposed for tests
   @transient lazy val rawPartitions = {
-val 

spark git commit: [SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows command scripts

2017-10-06 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 0c03297bf -> c7b46d4d8


[SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows command scripts

## What changes were proposed in this pull request?

All the windows command scripts can not handle quotes in parameter.

Run a windows command shell with parameter which has quotes can reproduce the 
bug:

```
C:\Users\meng\software\spark-2.2.0-bin-hadoop2.7> bin\spark-shell 
--driver-java-options " -Dfile.encoding=utf-8 "
'C:\Users\meng\software\spark-2.2.0-bin-hadoop2.7\bin\spark-shell2.cmd" 
--driver-java-options "' is not recognized as an internal or external command,
operable program or batch file.
```

Windows recognize "--driver-java-options" as part of the command.
All the Windows command script has the following code have the bug.

```
cmd /V /E /C "" %*
```

We should quote command and parameters like

```
cmd /V /E /C """ %*"
```

## How was this patch tested?

Test manually on Windows 10 and Windows 7

We can verify it by the following demo:

```
C:\Users\meng\program\demo>cat a.cmd
echo off
cmd /V /E /C "b.cmd" %*

C:\Users\meng\program\demo>cat b.cmd
echo off
echo %*

C:\Users\meng\program\demo>cat c.cmd
echo off
cmd /V /E /C ""b.cmd" %*"

C:\Users\meng\program\demo>a.cmd "123"
'b.cmd" "123' is not recognized as an internal or external command,
operable program or batch file.

C:\Users\meng\program\demo>c.cmd "123"
"123"
```

With the spark-shell.cmd example, change it to the following code will make the 
command execute succeed.

```
cmd /V /E /C ""%~dp0spark-shell2.cmd" %*"
```

```
C:\Users\meng\software\spark-2.2.0-bin-hadoop2.7> bin\spark-shell  
--driver-java-options " -Dfile.encoding=utf-8 "
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
...

```

Author: minixalpha 

Closes #19090 from minixalpha/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7b46d4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7b46d4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7b46d4d

Branch: refs/heads/master
Commit: c7b46d4d8aa8da24131d79d2bfa36e8db19662e4
Parents: 0c03297
Author: minixalpha 
Authored: Fri Oct 6 23:38:47 2017 +0900
Committer: hyukjinkwon 
Committed: Fri Oct 6 23:38:47 2017 +0900

--
 bin/beeline.cmd  | 4 +++-
 bin/pyspark.cmd  | 4 +++-
 bin/run-example.cmd  | 5 -
 bin/spark-class.cmd  | 4 +++-
 bin/spark-shell.cmd  | 4 +++-
 bin/spark-submit.cmd | 4 +++-
 bin/sparkR.cmd   | 4 +++-
 7 files changed, 22 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c7b46d4d/bin/beeline.cmd
--
diff --git a/bin/beeline.cmd b/bin/beeline.cmd
index 02464bd..288059a 100644
--- a/bin/beeline.cmd
+++ b/bin/beeline.cmd
@@ -17,4 +17,6 @@ rem See the License for the specific language governing 
permissions and
 rem limitations under the License.
 rem
 
-cmd /V /E /C "%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*
+rem The outermost quotes are used to prevent Windows command line parse error
+rem when there are some quotes in parameters, see SPARK-21877.
+cmd /V /E /C ""%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*"

http://git-wip-us.apache.org/repos/asf/spark/blob/c7b46d4d/bin/pyspark.cmd
--
diff --git a/bin/pyspark.cmd b/bin/pyspark.cmd
index 72d046a..3dcf1d4 100644
--- a/bin/pyspark.cmd
+++ b/bin/pyspark.cmd
@@ -20,4 +20,6 @@ rem
 rem This is the entry point for running PySpark. To avoid polluting the
 rem environment, it just launches a new cmd to do the real work.
 
-cmd /V /E /C "%~dp0pyspark2.cmd" %*
+rem The outermost quotes are used to prevent Windows command line parse error
+rem when there are some quotes in parameters, see SPARK-21877.
+cmd /V /E /C ""%~dp0pyspark2.cmd" %*"

http://git-wip-us.apache.org/repos/asf/spark/blob/c7b46d4d/bin/run-example.cmd
--
diff --git a/bin/run-example.cmd b/bin/run-example.cmd
index f9b786e..efa5f81 100644
--- a/bin/run-example.cmd
+++ b/bin/run-example.cmd
@@ -19,4 +19,7 @@ rem
 
 set SPARK_HOME=%~dp0..
 set _SPARK_CMD_USAGE=Usage: ./bin/run-example [options] example-class [example 
args]
-cmd /V /E /C "%~dp0spark-submit.cmd" run-example %*
+
+rem The outermost quotes are used to prevent Windows command line parse error
+rem when there are some quotes in parameters, see SPARK-21877.
+cmd /V /E /C ""%~dp0spark-submit.cmd" run-example %*"


spark git commit: [SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2

2017-10-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 83488cc31 -> 0c03297bf


[SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2

## What changes were proposed in this pull request?

Move flume behind a profile, take 2. See 
https://github.com/apache/spark/pull/19365 for most of the back-story.

This change should fix the problem by removing the examples module dependency 
and moving Flume examples to the module itself. It also adds deprecation 
messages, per a discussion on dev about deprecating for 2.3.0.

## How was this patch tested?

Existing tests, which still enable flume integration.

Author: Sean Owen 

Closes #19412 from srowen/SPARK-22142.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c03297b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c03297b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c03297b

Branch: refs/heads/master
Commit: 0c03297bf0e87944f9fe0535fdae5518228e3e29
Parents: 83488cc
Author: Sean Owen 
Authored: Fri Oct 6 15:08:28 2017 +0100
Committer: Sean Owen 
Committed: Fri Oct 6 15:08:28 2017 +0100

--
 dev/create-release/release-build.sh |  4 +-
 dev/mima|  2 +-
 dev/scalastyle  |  1 +
 dev/sparktestsupport/modules.py | 20 +-
 dev/test-dependencies.sh|  2 +-
 docs/building-spark.md  |  7 ++
 docs/streaming-flume-integration.md | 13 ++--
 examples/pom.xml|  7 --
 .../examples/streaming/JavaFlumeEventCount.java | 69 ---
 .../examples/streaming/FlumeEventCount.scala| 70 
 .../streaming/FlumePollingEventCount.scala  | 67 ---
 .../spark/examples/JavaFlumeEventCount.java | 67 +++
 .../apache/spark/examples/FlumeEventCount.scala | 68 +++
 .../spark/examples/FlumePollingEventCount.scala | 65 ++
 .../spark/streaming/flume/FlumeUtils.scala  |  1 +
 pom.xml | 13 +++-
 project/SparkBuild.scala| 17 ++---
 python/pyspark/streaming/flume.py   |  4 ++
 python/pyspark/streaming/tests.py   | 16 -
 19 files changed, 273 insertions(+), 240 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index 5390f59..7e8d5c7 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -84,9 +84,9 @@ MVN="build/mvn --force"
 # Hive-specific profiles for some builds
 HIVE_PROFILES="-Phive -Phive-thriftserver"
 # Profiles for publishing snapshots and release to Maven Central
-PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl 
-Pkinesis-asl"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl 
-Pkinesis-asl"
 # Profiles for building binary releases
-BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
+BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
 # Scala 2.11 only profiles for some builds
 SCALA_2_11_PROFILES="-Pkafka-0-8"
 # Scala 2.12 only profiles for some builds

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/mima
--
diff --git a/dev/mima b/dev/mima
index fdb21f5..1e3ca97 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 cd "$FWDIR"
 
-SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl 
-Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl 
-Pkinesis-asl -Phive-thriftserver -Phive"
 TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
tools/fullClasspath" | tail -n1)"
 OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES 
"export oldDeps/fullClasspath" | tail -n1)"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/scalastyle
--
diff --git a/dev/scalastyle b/dev/scalastyle
index e5aa589..89ecc8a 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \
 -Pmesos \
 -Pkafka-0-8 \
 -Pyarn \
+-Pflume \
 -Phive \
 -Phive-thriftserver \
 scalastyle test:scalastyle \

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/sparktestsupport/modules.py

spark git commit: [SPARK-21871][SQL] Fix infinite loop when bytecode size is larger than spark.sql.codegen.hugeMethodLimit

2017-10-06 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master ae61f187a -> 83488cc31


[SPARK-21871][SQL] Fix infinite loop when bytecode size is larger than 
spark.sql.codegen.hugeMethodLimit

## What changes were proposed in this pull request?
When exceeding `spark.sql.codegen.hugeMethodLimit`, the runtime fallbacks to 
the Volcano iterator solution. This could cause an infinite loop when 
`FileSourceScanExec` can use the columnar batch to read the data. This PR is to 
fix the issue.

## How was this patch tested?
Added a test

Author: gatorsmile 

Closes #19440 from gatorsmile/testt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83488cc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83488cc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83488cc3

Branch: refs/heads/master
Commit: 83488cc3180ca18f829516f550766efb3095881e
Parents: ae61f18
Author: gatorsmile 
Authored: Thu Oct 5 23:33:49 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 5 23:33:49 2017 -0700

--
 .../sql/execution/WholeStageCodegenExec.scala   | 12 ++
 .../sql/execution/WholeStageCodegenSuite.scala  | 23 ++--
 2 files changed, 29 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/83488cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 9073d59..1aaaf89 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
 // Check if compiled code has a too large function
 if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
-  logWarning(s"Found too long generated codes and JIT optimization might 
not work: " +
-s"the bytecode size was $maxCodeSize, this value went over the limit " 
+
+  logInfo(s"Found too long generated codes and JIT optimization might not 
work: " +
+s"the bytecode size ($maxCodeSize) is above the limit " +
 s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was 
disabled " +
 s"for this plan. To avoid this, you can raise the limit " +
-s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
-  return child.execute()
+s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
+  child match {
+// The fallback solution of batch file source scan still uses 
WholeStageCodegenExec
+case f: FileSourceScanExec if f.supportsBatch => // do nothing
+case _ => return child.execute()
+  }
 }
 
 val references = ctx.references.toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/83488cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index aaa77b3..098e4cf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, 
CodeGenerator}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
@@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
-class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
+class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
 
   test("range/filter should be combined") {
 val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
@@ -185,4 +185,23 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
 val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
 assert(maxCodeSize2 > 
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
   }
+
+  test("bytecode of batch file scan