spark git commit: [SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice

2017-03-02 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master e24f21b5f -> 982f3223b


[SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to 
listFiles twice

## What changes were proposed in this pull request?

Currently when we resolveRelation for a `FileFormat DataSource` without 
providing user schema, it will execute `listFiles`  twice in 
`InMemoryFileIndex` during `resolveRelation`.

This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice.

But there is a bug in `InMemoryFileIndex` see:
 [SPARK-19748](https://github.com/apache/spark/pull/17079)
 [SPARK-19761](https://github.com/apache/spark/pull/17093),
so this pr should be after SPARK-19748/ SPARK-19761.

## How was this patch tested?
unit test added

Author: windpiger 

Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/982f3223
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/982f3223
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/982f3223

Branch: refs/heads/master
Commit: 982f3223b4f55f988091402063fe8746c5e2cee4
Parents: e24f21b
Author: windpiger 
Authored: Thu Mar 2 23:54:01 2017 -0800
Committer: Wenchen Fan 
Committed: Thu Mar 2 23:54:01 2017 -0800

--
 .../spark/sql/execution/datasources/DataSource.scala   | 13 +
 .../sql/hive/PartitionedTablePerfStatsSuite.scala  | 11 +++
 2 files changed, 20 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index c1353d4..4947dfd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -106,10 +106,13 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
+   * @param fileStatusCache the shared cache for file statuses to speed up 
listing
* @return A pair of the data schema (excluding partition columns) and the 
schema of the partition
* columns.
*/
-  private def getOrInferFileFormatSchema(format: FileFormat): (StructType, 
StructType) = {
+  private def getOrInferFileFormatSchema(
+  format: FileFormat,
+  fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) 
= {
 // the operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
 // in streaming mode, we have already inferred and registered partition 
columns, we will
 // never have to materialize the lazy val below
@@ -122,7 +125,7 @@ case class DataSource(
 val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
 SparkHadoopUtil.get.globPathIfNecessary(qualified)
   }.toArray
-  new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
+  new InMemoryFileIndex(sparkSession, globbedPaths, options, None, 
fileStatusCache)
 }
 val partitionSchema = if (partitionColumns.isEmpty) {
   // Try to infer partitioning, because no DataSource in the read path 
provides the partitioning
@@ -354,7 +357,8 @@ case class DataSource(
   globPath
 }.toArray
 
-val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, 
fileStatusCache)
 
 val fileCatalog = if 
(sparkSession.sqlContext.conf.manageFilesourcePartitions &&
 catalogTable.isDefined && 
catalogTable.get.tracksPartitionsInCatalog) {
@@ -364,7 +368,8 @@ case class DataSource(
 catalogTable.get,
 
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
 } else {
-  new InMemoryFileIndex(sparkSession, globbedPaths, options, 
Some(partitionSchema))
+  new InMemoryFileIndex(
+sparkSession, globbedPaths, options, Some(partitionSchema), 
fileStatusCache)
 }
 
 HadoopFsRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
--
diff --git 

spark git commit: [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job

2017-03-02 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3a7591ad5 -> 1237aaea2


[SPARK-19779][SS] Delete needless tmp file after restart structured streaming 
job

## What changes were proposed in this pull request?

[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)

The PR (https://github.com/apache/spark/pull/17012) can to fix restart a 
Structured Streaming application using hdfs as fileSystem, but also exist a 
problem that a tmp file of delta file is still reserved in hdfs. And Structured 
Streaming don't delete the tmp file generated when restart streaming job in 
future.

## How was this patch tested?
 unit tests

Author: guifeng 

Closes #17124 from gf53520/SPARK-19779.

(cherry picked from commit e24f21b5f8365ed25346e986748b393e0b4be25c)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1237aaea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1237aaea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1237aaea

Branch: refs/heads/branch-2.1
Commit: 1237aaea279d6aac504ae1e3265c0b53779b5303
Parents: 3a7591a
Author: guifeng 
Authored: Thu Mar 2 21:19:29 2017 -0800
Committer: Shixiong Zhu 
Committed: Thu Mar 2 21:19:40 2017 -0800

--
 .../streaming/state/HDFSBackedStateStoreProvider.scala| 4 +++-
 .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++
 2 files changed, 10 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2d29940..ab1204a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
   // semantically correct because Structured Streaming requires rerunning 
a batch should
   // generate the same output. (SPARK-19677)
   // scalastyle:on
-  if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, 
finalDeltaFile)) {
+  if (fs.exists(finalDeltaFile)) {
+fs.delete(tempDeltaFile, true)
+  } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
 throw new IOException(s"Failed to rename $tempDeltaFile to 
$finalDeltaFile")
   }
   loadedMaps.put(newVersion, map)

http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 21a0a10..255378c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io.{File, IOException}
 import java.net.URI
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
 val provider = newStoreProvider(hadoopConf = conf)
 provider.getStore(0).commit()
 provider.getStore(0).commit()
+
+// Verify we don't leak temp files
+val tempFiles = FileUtils.listFiles(new 
File(provider.id.checkpointLocation),
+  null, true).asScala.filter(_.getName.startsWith("temp-"))
+assert(tempFiles.isEmpty)
   }
 
   test("corrupted file handling") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job

2017-03-02 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 491b47a16 -> 73801880f


[SPARK-19779][SS] Delete needless tmp file after restart structured streaming 
job

## What changes were proposed in this pull request?

[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)

The PR (https://github.com/apache/spark/pull/17012) can to fix restart a 
Structured Streaming application using hdfs as fileSystem, but also exist a 
problem that a tmp file of delta file is still reserved in hdfs. And Structured 
Streaming don't delete the tmp file generated when restart streaming job in 
future.

## How was this patch tested?
 unit tests

Author: guifeng 

Closes #17124 from gf53520/SPARK-19779.

(cherry picked from commit e24f21b5f8365ed25346e986748b393e0b4be25c)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73801880
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73801880
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73801880

Branch: refs/heads/branch-2.0
Commit: 73801880fcc79b611e796dd0804b8b6df12da4e9
Parents: 491b47a
Author: guifeng 
Authored: Thu Mar 2 21:19:29 2017 -0800
Committer: Shixiong Zhu 
Committed: Thu Mar 2 21:19:49 2017 -0800

--
 .../streaming/state/HDFSBackedStateStoreProvider.scala| 4 +++-
 .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++
 2 files changed, 10 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73801880/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index f234ca6..759b1c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -263,7 +263,9 @@ private[state] class HDFSBackedStateStoreProvider(
   // semantically correct because Structured Streaming requires rerunning 
a batch should
   // generate the same output. (SPARK-19677)
   // scalastyle:on
-  if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, 
finalDeltaFile)) {
+  if (fs.exists(finalDeltaFile)) {
+fs.delete(tempDeltaFile, true)
+  } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
 throw new IOException(s"Failed to rename $tempDeltaFile to 
$finalDeltaFile")
   }
   loadedMaps.put(newVersion, map)

http://git-wip-us.apache.org/repos/asf/spark/blob/73801880/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index b5f161c..e375604 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io.{File, IOException}
 import java.net.URI
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
 val provider = newStoreProvider(hadoopConf = conf)
 provider.getStore(0).commit()
 provider.getStore(0).commit()
+
+// Verify we don't leak temp files
+val tempFiles = FileUtils.listFiles(new 
File(provider.id.checkpointLocation),
+  null, true).asScala.filter(_.getName.startsWith("temp-"))
+assert(tempFiles.isEmpty)
   }
 
   test("corrupted file handling") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19602][SQL][TESTS] Add tests for qualified column names

2017-03-02 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 93ae176e8 -> f37bb1430


[SPARK-19602][SQL][TESTS] Add tests for qualified column names

## What changes were proposed in this pull request?
- Add tests covering different scenarios with qualified column names
- Please see Section 2 in the design doc for the various test scenarios 
[here](https://issues.apache.org/jira/secure/attachment/12854681/Design_ColResolution_JIRA19602.pdf)
- As part of SPARK-19602, changes are made to support three part column name. 
In order to aid in the review and to reduce the diff, the test scenarios are 
separated out into this PR.

## How was this patch tested?
- This is a **test only** change. The individual test suites were run 
successfully.

Author: Sunitha Kambhampati 

Closes #17067 from skambha/colResolutionTests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f37bb143
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f37bb143
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f37bb143

Branch: refs/heads/master
Commit: f37bb143022ea10107877c80c5c73bd77aeda7ff
Parents: 93ae176
Author: Sunitha Kambhampati 
Authored: Thu Mar 2 21:19:22 2017 -0800
Committer: Xiao Li 
Committed: Thu Mar 2 21:19:22 2017 -0800

--
 .../inputs/columnresolution-negative.sql|  36 ++
 .../sql-tests/inputs/columnresolution-views.sql |  25 ++
 .../sql-tests/inputs/columnresolution.sql   |  88 
 .../results/columnresolution-negative.sql.out   | 240 ++
 .../results/columnresolution-views.sql.out  | 140 ++
 .../sql-tests/results/columnresolution.sql.out  | 447 +++
 .../sql-tests/results/inner-join.sql.out|   3 +-
 .../apache/spark/sql/SQLQueryTestSuite.scala|   6 +-
 8 files changed, 980 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f37bb143/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql
--
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql 
b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql
new file mode 100644
index 000..1caa45c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql
@@ -0,0 +1,36 @@
+-- Negative testcases for column resolution
+CREATE DATABASE mydb1;
+USE mydb1;
+CREATE TABLE t1 USING parquet AS SELECT 1 AS i1;
+
+CREATE DATABASE mydb2;
+USE mydb2;
+CREATE TABLE t1 USING parquet AS SELECT 20 AS i1;
+
+-- Negative tests: column resolution scenarios with ambiguous cases in join 
queries
+SET spark.sql.crossJoin.enabled = true;
+USE mydb1;
+SELECT i1 FROM t1, mydb1.t1;
+SELECT t1.i1 FROM t1, mydb1.t1;
+SELECT mydb1.t1.i1 FROM t1, mydb1.t1;
+SELECT i1 FROM t1, mydb2.t1;
+SELECT t1.i1 FROM t1, mydb2.t1;
+USE mydb2;
+SELECT i1 FROM t1, mydb1.t1;
+SELECT t1.i1 FROM t1, mydb1.t1;
+SELECT i1 FROM t1, mydb2.t1;
+SELECT t1.i1 FROM t1, mydb2.t1;
+SELECT db1.t1.i1 FROM t1, mydb2.t1;
+SET spark.sql.crossJoin.enabled = false;
+
+-- Negative tests
+USE mydb1;
+SELECT mydb1.t1 FROM t1;
+SELECT t1.x.y.* FROM t1;
+SELECT t1 FROM mydb1.t1;
+USE mydb2;
+SELECT mydb1.t1.i1 FROM t1;
+
+-- reset
+DROP DATABASE mydb1 CASCADE;
+DROP DATABASE mydb2 CASCADE;

http://git-wip-us.apache.org/repos/asf/spark/blob/f37bb143/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql
--
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql 
b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql
new file mode 100644
index 000..d3f9287
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql
@@ -0,0 +1,25 @@
+-- Tests for qualified column names for the view code-path
+-- Test scenario with Temporary view
+CREATE OR REPLACE TEMPORARY VIEW view1 AS SELECT 2 AS i1;
+SELECT view1.* FROM view1;
+SELECT * FROM view1;
+SELECT view1.i1 FROM view1;
+SELECT i1 FROM view1;
+SELECT a.i1 FROM view1 AS a;
+SELECT i1 FROM view1 AS a;
+-- cleanup
+DROP VIEW view1;
+
+-- Test scenario with Global Temp view
+CREATE OR REPLACE GLOBAL TEMPORARY VIEW view1 as SELECT 1 as i1;
+SELECT * FROM global_temp.view1;
+-- TODO: Support this scenario
+SELECT global_temp.view1.* FROM global_temp.view1;
+SELECT i1 FROM global_temp.view1;
+-- TODO: Support this scenario
+SELECT global_temp.view1.i1 FROM global_temp.view1;
+SELECT view1.i1 FROM global_temp.view1;
+SELECT a.i1 FROM global_temp.view1 AS a;
+SELECT i1 FROM global_temp.view1 AS a;
+-- cleanup
+DROP VIEW global_temp.view1;


spark git commit: [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job

2017-03-02 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f37bb1430 -> e24f21b5f


[SPARK-19779][SS] Delete needless tmp file after restart structured streaming 
job

## What changes were proposed in this pull request?

[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)

The PR (https://github.com/apache/spark/pull/17012) can to fix restart a 
Structured Streaming application using hdfs as fileSystem, but also exist a 
problem that a tmp file of delta file is still reserved in hdfs. And Structured 
Streaming don't delete the tmp file generated when restart streaming job in 
future.

## How was this patch tested?
 unit tests

Author: guifeng 

Closes #17124 from gf53520/SPARK-19779.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e24f21b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e24f21b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e24f21b5

Branch: refs/heads/master
Commit: e24f21b5f8365ed25346e986748b393e0b4be25c
Parents: f37bb14
Author: guifeng 
Authored: Thu Mar 2 21:19:29 2017 -0800
Committer: Shixiong Zhu 
Committed: Thu Mar 2 21:19:29 2017 -0800

--
 .../streaming/state/HDFSBackedStateStoreProvider.scala| 4 +++-
 .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++
 2 files changed, 10 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e24f21b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2d29940..ab1204a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
   // semantically correct because Structured Streaming requires rerunning 
a batch should
   // generate the same output. (SPARK-19677)
   // scalastyle:on
-  if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, 
finalDeltaFile)) {
+  if (fs.exists(finalDeltaFile)) {
+fs.delete(tempDeltaFile, true)
+  } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
 throw new IOException(s"Failed to rename $tempDeltaFile to 
$finalDeltaFile")
   }
   loadedMaps.put(newVersion, map)

http://git-wip-us.apache.org/repos/asf/spark/blob/e24f21b5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index dc4e935..e848f74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io.{File, IOException}
 import java.net.URI
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
 val provider = newStoreProvider(hadoopConf = conf)
 provider.getStore(0).commit()
 provider.getStore(0).commit()
+
+// Verify we don't leak temp files
+val tempFiles = FileUtils.listFiles(new 
File(provider.id.checkpointLocation),
+  null, true).asScala.filter(_.getName.startsWith("temp-"))
+assert(tempFiles.isEmpty)
   }
 
   test("corrupted file handling") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19745][ML] SVCAggregator captures coefficients in its closure

2017-03-02 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 8417a7ae6 -> 93ae176e8


[SPARK-19745][ML] SVCAggregator captures coefficients in its closure

## What changes were proposed in this pull request?

JIRA: [SPARK-19745](https://issues.apache.org/jira/browse/SPARK-19745)

Reorganize SVCAggregator to avoid serializing coefficients. This patch also 
makes the gradient array a `lazy val` which will avoid materializing a large 
array on the driver before shipping the class to the executors. This 
improvement stems from https://github.com/apache/spark/pull/16037. Actually, 
probably all ML aggregators can benefit from this.

We can either: a.) separate the gradient improvement into another patch b.) 
keep what's here _plus_ add the lazy evaluation to all other aggregators in 
this patch or c.) keep it as is.

## How was this patch tested?

This is an interesting question! I don't know of a reasonable way to test this 
right now. Ideally, we could perform an optimization and look at the shuffle 
write data for each task, and we could compare the size to what it we know it 
should be: `numCoefficients * 8 bytes`. Not sure if there is a good way to do 
that right now? We could discuss this here or in another JIRA, but I suspect it 
would be a significant undertaking.

Author: sethah 

Closes #17076 from sethah/svc_agg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ae176e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ae176e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ae176e

Branch: refs/heads/master
Commit: 93ae176e8943d6b346c80deea778bffd188366a1
Parents: 8417a7a
Author: sethah 
Authored: Thu Mar 2 19:38:25 2017 -0800
Committer: Yanbo Liang 
Committed: Thu Mar 2 19:38:25 2017 -0800

--
 .../spark/ml/classification/LinearSVC.scala | 29 
 .../ml/classification/LogisticRegression.scala  |  2 +-
 .../spark/ml/clustering/GaussianMixture.scala   |  6 ++--
 .../ml/regression/AFTSurvivalRegression.scala   |  2 +-
 .../spark/ml/regression/LinearRegression.scala  |  2 +-
 .../ml/classification/LinearSVCSuite.scala  | 17 +++-
 6 files changed, 34 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/93ae176e/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index bf6e76d..f76b14e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -440,19 +440,14 @@ private class LinearSVCAggregator(
 
   private val numFeatures: Int = bcFeaturesStd.value.length
   private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 
1 else numFeatures
-  private val coefficients: Vector = bcCoefficients.value
   private var weightSum: Double = 0.0
   private var lossSum: Double = 0.0
-  require(numFeaturesPlusIntercept == coefficients.size, s"Dimension mismatch. 
Coefficients " +
-s"length ${coefficients.size}, FeaturesStd length ${numFeatures}, 
fitIntercept: $fitIntercept")
-
-  private val coefficientsArray = coefficients match {
-case dv: DenseVector => dv.values
-case _ =>
-  throw new IllegalArgumentException(
-s"coefficients only supports dense vector but got type 
${coefficients.getClass}.")
+  @transient private lazy val coefficientsArray = bcCoefficients.value match {
+case DenseVector(values) => values
+case _ => throw new IllegalArgumentException(s"coefficients only supports 
dense vector" +
+  s" but got type ${bcCoefficients.value.getClass}.")
   }
-  private val gradientSumArray = 
Array.fill[Double](coefficientsArray.length)(0)
+  private lazy val gradientSumArray = new 
Array[Double](numFeaturesPlusIntercept)
 
   /**
* Add a new training instance to this LinearSVCAggregator, and update the 
loss and gradient
@@ -463,6 +458,9 @@ private class LinearSVCAggregator(
*/
   def add(instance: Instance): this.type = {
 instance match { case Instance(label, weight, features) =>
+  require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
+  require(numFeatures == features.size, s"Dimensions mismatch when adding 
new instance." +
+s" Expecting $numFeatures but got ${features.size}.")
   if (weight == 0.0) return this
   val localFeaturesStd = bcFeaturesStd.value
   val localCoefficients = coefficientsArray
@@ -530,18 +528,15 @@ private class LinearSVCAggregator(
 this
   }
 
-  def loss: 

spark git commit: [SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https

2017-03-02 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e30fe1c6a -> 491b47a16


[SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https

## What changes were proposed in this pull request?

If spark ui port (4040) is not set, it will choose port number 0, this will 
make https port to also choose 0. And in Spark 2.1 code, it will use this https 
port (0) to do redirect, so when redirect triggered, it will point to a wrong 
url:

like:

```
/tmp/temp$ wget http://172.27.25.134:55015
--2017-02-23 12:13:54--  http://172.27.25.134:55015/
Connecting to 172.27.25.134:55015... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://172.27.25.134:0/ [following]
--2017-02-23 12:13:54--  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:13:55--  (try: 2)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:13:57--  (try: 3)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:14:00--  (try: 4)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

```

So instead of using 0 to do redirect, we should pick a bound port instead.

This issue only exists in Spark 2.1-, and can be reproduced in yarn cluster 
mode.

## How was this patch tested?

Current redirect UT doesn't verify this issue, so extend current UT to do 
correct verification.

Author: jerryshao 

Closes #17083 from jerryshao/SPARK-19750.

(cherry picked from commit 3a7591ad5315308d24c0e444ce304ff78aef2304)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/491b47a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/491b47a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/491b47a1

Branch: refs/heads/branch-2.0
Commit: 491b47a162030cb0dbb81ae32898e109707a4709
Parents: e30fe1c
Author: jerryshao 
Authored: Thu Mar 2 17:18:52 2017 -0800
Committer: Marcelo Vanzin 
Committed: Thu Mar 2 17:19:04 2017 -0800

--
 .../main/scala/org/apache/spark/TestUtils.scala | 23 
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 10 +
 .../scala/org/apache/spark/ui/UISuite.scala |  6 -
 3 files changed, 30 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/491b47a1/core/src/main/scala/org/apache/spark/TestUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index a76395b..3bf1d31 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -27,6 +27,7 @@ import java.util.Arrays
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream}
 import javax.net.ssl._
+import javax.servlet.http.HttpServletResponse
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
 
 import scala.collection.JavaConverters._
@@ -186,12 +187,12 @@ private[spark] object TestUtils {
   }
 
   /**
-   * Returns the response code from an HTTP(S) URL.
+   * Returns the response code and url (if redirected) from an HTTP(S) URL.
*/
-  def httpResponseCode(
+  def httpResponseCodeAndURL(
   url: URL,
   method: String = "GET",
-  headers: Seq[(String, String)] = Nil): Int = {
+  headers: Seq[(String, String)] = Nil): (Int, Option[String]) = {
 val connection = url.openConnection().asInstanceOf[HttpURLConnection]
 connection.setRequestMethod(method)
 headers.foreach { case (k, v) => connection.setRequestProperty(k, v) }
@@ -210,16 +211,30 @@ private[spark] object TestUtils {
   sslCtx.init(null, Array(trustManager), new SecureRandom())
   
connection.asInstanceOf[HttpsURLConnection].setSSLSocketFactory(sslCtx.getSocketFactory())
   connection.asInstanceOf[HttpsURLConnection].setHostnameVerifier(verifier)
+  connection.setInstanceFollowRedirects(false)
 }
 
 try {
   connection.connect()
-  connection.getResponseCode()
+  if (connection.getResponseCode == HttpServletResponse.SC_FOUND) {
+(connection.getResponseCode, 
Option(connection.getHeaderField("Location")))
+  } else {
+(connection.getResponseCode(), None)
+  }
 } finally {
   connection.disconnect()
 }
   }
 
+  /**
+   * Returns the response code from an HTTP(S) URL.
+   */
+  def httpResponseCode(
+  url: URL,
+  method: String = "GET",
+  headers: 

spark git commit: [SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https

2017-03-02 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 27347b5f2 -> 3a7591ad5


[SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https

## What changes were proposed in this pull request?

If spark ui port (4040) is not set, it will choose port number 0, this will 
make https port to also choose 0. And in Spark 2.1 code, it will use this https 
port (0) to do redirect, so when redirect triggered, it will point to a wrong 
url:

like:

```
/tmp/temp$ wget http://172.27.25.134:55015
--2017-02-23 12:13:54--  http://172.27.25.134:55015/
Connecting to 172.27.25.134:55015... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://172.27.25.134:0/ [following]
--2017-02-23 12:13:54--  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:13:55--  (try: 2)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:13:57--  (try: 3)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:14:00--  (try: 4)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

```

So instead of using 0 to do redirect, we should pick a bound port instead.

This issue only exists in Spark 2.1-, and can be reproduced in yarn cluster 
mode.

## How was this patch tested?

Current redirect UT doesn't verify this issue, so extend current UT to do 
correct verification.

Author: jerryshao 

Closes #17083 from jerryshao/SPARK-19750.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a7591ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a7591ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a7591ad

Branch: refs/heads/branch-2.1
Commit: 3a7591ad5315308d24c0e444ce304ff78aef2304
Parents: 27347b5
Author: jerryshao 
Authored: Thu Mar 2 17:18:52 2017 -0800
Committer: Marcelo Vanzin 
Committed: Thu Mar 2 17:18:52 2017 -0800

--
 .../main/scala/org/apache/spark/TestUtils.scala | 23 
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 10 +
 .../scala/org/apache/spark/ui/UISuite.scala |  6 -
 3 files changed, 30 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3a7591ad/core/src/main/scala/org/apache/spark/TestUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index c3ccdb0..5cdc4ee 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -27,6 +27,7 @@ import java.util.Arrays
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream}
 import javax.net.ssl._
+import javax.servlet.http.HttpServletResponse
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
 
 import scala.collection.JavaConverters._
@@ -186,12 +187,12 @@ private[spark] object TestUtils {
   }
 
   /**
-   * Returns the response code from an HTTP(S) URL.
+   * Returns the response code and url (if redirected) from an HTTP(S) URL.
*/
-  def httpResponseCode(
+  def httpResponseCodeAndURL(
   url: URL,
   method: String = "GET",
-  headers: Seq[(String, String)] = Nil): Int = {
+  headers: Seq[(String, String)] = Nil): (Int, Option[String]) = {
 val connection = url.openConnection().asInstanceOf[HttpURLConnection]
 connection.setRequestMethod(method)
 headers.foreach { case (k, v) => connection.setRequestProperty(k, v) }
@@ -210,16 +211,30 @@ private[spark] object TestUtils {
   sslCtx.init(null, Array(trustManager), new SecureRandom())
   
connection.asInstanceOf[HttpsURLConnection].setSSLSocketFactory(sslCtx.getSocketFactory())
   connection.asInstanceOf[HttpsURLConnection].setHostnameVerifier(verifier)
+  connection.setInstanceFollowRedirects(false)
 }
 
 try {
   connection.connect()
-  connection.getResponseCode()
+  if (connection.getResponseCode == HttpServletResponse.SC_FOUND) {
+(connection.getResponseCode, 
Option(connection.getHeaderField("Location")))
+  } else {
+(connection.getResponseCode(), None)
+  }
 } finally {
   connection.disconnect()
 }
   }
 
+  /**
+   * Returns the response code from an HTTP(S) URL.
+   */
+  def httpResponseCode(
+  url: URL,
+  method: String = "GET",
+  headers: Seq[(String, String)] = Nil): Int = {
+httpResponseCodeAndURL(url, method, headers)._1
+  }
 }
 
 


spark git commit: [SPARK-19276][CORE] Fetch Failure handling robust to user error handling

2017-03-02 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master 433d9eb61 -> 8417a7ae6


[SPARK-19276][CORE] Fetch Failure handling robust to user error handling

## What changes were proposed in this pull request?

Fault-tolerance in spark requires special handling of shuffle fetch
failures.  The Executor would catch FetchFailedException and send a
special msg back to the driver.

However, intervening user code could intercept that exception, and wrap
it with something else.  This even happens in SparkSQL.  So rather than
checking the thrown exception only, we'll store the fetch failure directly
in the TaskContext, where users can't touch it.

## How was this patch tested?

Added a test case which failed before the fix.  Full test suite via jenkins.

Author: Imran Rashid 

Closes #16639 from squito/SPARK-19276.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8417a7ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8417a7ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8417a7ae

Branch: refs/heads/master
Commit: 8417a7ae6c0ea3fb8dc41bc492fc9513d1ad24af
Parents: 433d9eb
Author: Imran Rashid 
Authored: Thu Mar 2 16:46:01 2017 -0800
Committer: Kay Ousterhout 
Committed: Thu Mar 2 16:46:01 2017 -0800

--
 .../scala/org/apache/spark/TaskContext.scala|   7 +
 .../org/apache/spark/TaskContextImpl.scala  |  11 ++
 .../org/apache/spark/executor/Executor.scala|  33 -
 .../scala/org/apache/spark/scheduler/Task.scala |   9 +-
 .../spark/shuffle/FetchFailedException.scala|  13 +-
 .../apache/spark/executor/ExecutorSuite.scala   | 139 ++-
 project/MimaExcludes.scala  |   3 +
 7 files changed, 198 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8417a7ae/core/src/main/scala/org/apache/spark/TaskContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0fd777e..f0867ec 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, 
TaskFailureListener}
 
 
@@ -190,4 +191,10 @@ abstract class TaskContext extends Serializable {
*/
   private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit
 
+  /**
+   * Record that this task has failed due to a fetch failure from a remote 
host.  This allows
+   * fetch-failure handling to get triggered by the driver, regardless of 
intervening user-code.
+   */
+  private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8417a7ae/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index c904e08..dc0d128 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util._
 
 private[spark] class TaskContextImpl(
@@ -56,6 +57,10 @@ private[spark] class TaskContextImpl(
   // Whether the task has failed.
   @volatile private var failed: Boolean = false
 
+  // If there was a fetch failure in the task, we store it here, to make sure 
user-code doesn't
+  // hide the exception.  See SPARK-19276
+  @volatile private var _fetchFailedException: Option[FetchFailedException] = 
None
+
   override def addTaskCompletionListener(listener: TaskCompletionListener): 
this.type = {
 onCompleteCallbacks += listener
 this
@@ -126,4 +131,10 @@ private[spark] class TaskContextImpl(
 taskMetrics.registerAccumulator(a)
   }
 
+  private[spark] override def setFetchFailed(fetchFailed: 
FetchFailedException): Unit = {
+this._fetchFailedException = Option(fetchFailed)
+  }
+
+  private[spark] def fetchFailed: Option[FetchFailedException] = 
_fetchFailedException
+
 }


spark git commit: [SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks

2017-03-02 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master 5ae3516bf -> 433d9eb61


[SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for 
already failed tasks

## What changes were proposed in this pull request?

Previously it was possible for there to be a race between a task failure and 
committing the output of a task. For example, the driver may mark a task 
attempt as failed due to an executor heartbeat timeout (possibly due to GC), 
but the task attempt actually ends up coordinating with the 
OutputCommitCoordinator once the executor recovers and committing its result. 
This will lead to any retry attempt failing because the task result has already 
been committed despite the original attempt failing.

This ensures that any previously failed task attempts cannot enter the commit 
protocol.

## How was this patch tested?

Added a unit test

Author: Patrick Woody 

Closes #16959 from pwoody/pw/recordFailuresForCommitter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/433d9eb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/433d9eb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/433d9eb6

Branch: refs/heads/master
Commit: 433d9eb6151a547af967cc1ac983a789bed60704
Parents: 5ae3516
Author: Patrick Woody 
Authored: Thu Mar 2 15:55:32 2017 -0800
Committer: Kay Ousterhout 
Committed: Thu Mar 2 15:55:32 2017 -0800

--
 .../scheduler/OutputCommitCoordinator.scala | 59 
 .../OutputCommitCoordinatorSuite.scala  | 11 
 2 files changed, 46 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/433d9eb6/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 08d220b..83d87b5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private type StageId = Int
   private type PartitionId = Int
   private type TaskAttemptNumber = Int
-
   private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
+  private case class StageState(numPartitions: Int) {
+val authorizedCommitters = 
Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
+val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
+  }
 
   /**
-   * Map from active stages's id => partition id => task attempt with 
exclusive lock on committing
-   * output for that partition.
+   * Map from active stages's id => authorized task attempts for each 
partition id, which hold an
+   * exclusive lock on committing task output for that partition, as well as 
any known failed
+   * attempts in the stage.
*
* Entries are added to the top-level map when stages start and are removed 
they finish
* (either successfully or unsuccessfully).
*
* Access to this map should be guarded by synchronizing on the 
OutputCommitCoordinator instance.
*/
-  private val authorizedCommittersByStage = mutable.Map[StageId, 
Array[TaskAttemptNumber]]()
+  private val stageStates = mutable.Map[StageId, StageState]()
 
   /**
* Returns whether the OutputCommitCoordinator's internal data structures 
are all empty.
*/
   def isEmpty: Boolean = {
-authorizedCommittersByStage.isEmpty
+stageStates.isEmpty
   }
 
   /**
@@ -105,19 +109,13 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
* @param maxPartitionId the maximum partition id that could appear in this 
stage's tasks (i.e.
*   the maximum possible value of 
`context.partitionId`).
*/
-  private[scheduler] def stageStart(
-  stage: StageId,
-  maxPartitionId: Int): Unit = {
-val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
-java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
-synchronized {
-  authorizedCommittersByStage(stage) = arr
-}
+  private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit 
= synchronized {
+stageStates(stage) = new StageState(maxPartitionId + 1)
   }
 
   // Called by DAGScheduler
   private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
-authorizedCommittersByStage.remove(stage)
+stageStates.remove(stage)
   }
 
   // Called by DAGScheduler
@@ -126,7 +124,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: 

spark git commit: [SPARK-19720][CORE] Redact sensitive information from SparkSubmit console

2017-03-02 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 9cca3dbf4 -> 5ae3516bf


[SPARK-19720][CORE] Redact sensitive information from SparkSubmit console

## What changes were proposed in this pull request?
This change redacts senstive information (based on `spark.redaction.regex` 
property)
from the Spark Submit console logs. Such sensitive information is already being
redacted from event logs and yarn logs, etc.

## How was this patch tested?
Testing was done manually to make sure that the console logs were not printing 
any
sensitive information.

Here's some output from the console:

```
Spark properties used, including those specified through
 --conf and those from the properties file /etc/spark2/conf/spark-defaults.conf:
  (spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted))
  (spark.authenticate,false)
  (spark.executorEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted))
```

```
System properties:
(spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted))
(spark.authenticate,false)
(spark.executorEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted))
```
There is a risk if new print statements were added to the console down the 
road, sensitive information may still get leaked, since there is no test that 
asserts on the console log output. I considered it out of the scope of this 
JIRA to write an integration test to make sure new leaks don't happen in the 
future.

Running unit tests to make sure nothing else is broken by this change.

Author: Mark Grover 

Closes #17047 from markgrover/master_redaction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ae3516b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ae3516b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ae3516b

Branch: refs/heads/master
Commit: 5ae3516bfb7716f1793eb76b4fdc720b31829d07
Parents: 9cca3db
Author: Mark Grover 
Authored: Thu Mar 2 10:33:56 2017 -0800
Committer: Marcelo Vanzin 
Committed: Thu Mar 2 10:33:56 2017 -0800

--
 .../org/apache/spark/deploy/SparkSubmit.scala   |  3 ++-
 .../spark/deploy/SparkSubmitArguments.scala | 12 ---
 .../scala/org/apache/spark/util/Utils.scala | 21 +++-
 3 files changed, 31 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5ae3516b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 5ffdedd..1e50eb6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -665,7 +665,8 @@ object SparkSubmit extends CommandLineUtils {
 if (verbose) {
   printStream.println(s"Main class:\n$childMainClass")
   printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
-  printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
+  // sysProps may contain sensitive information, so redact before printing
+  printStream.println(s"System 
properties:\n${Utils.redact(sysProps).mkString("\n")}")
   printStream.println(s"Classpath 
elements:\n${childClasspath.mkString("\n")}")
   printStream.println("\n")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae3516b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index dee7734..0614d80 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -84,9 +84,15 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 // scalastyle:off println
 if (verbose) SparkSubmit.printStream.println(s"Using properties file: 
$propertiesFile")
 Option(propertiesFile).foreach { filename =>
-  Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
+  val properties = Utils.getPropertiesFromFile(filename)
+  properties.foreach { case (k, v) =>
 defaultProperties(k) = v
-if (verbose) SparkSubmit.printStream.println(s"Adding default 
property: $k=$v")
+  }
+  // Property files may contain sensitive information, so redact before 
printing
+  if (verbose) {
+Utils.redact(properties).foreach { case (k, v) =>
+  SparkSubmit.printStream.println(s"Adding default property: $k=$v")
+}
   }
 }
 

spark git commit: [SPARK-19345][ML][DOC] Add doc for "coldStartStrategy" usage in ALS

2017-03-02 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master 50c08e82f -> 9cca3dbf4


[SPARK-19345][ML][DOC] Add doc for "coldStartStrategy" usage in ALS

[SPARK-14489](https://issues.apache.org/jira/browse/SPARK-14489) added the 
ability to skip `NaN` predictions during `ALSModel.transform`. This PR adds 
documentation for the `coldStartStrategy` param to the ALS user guide, and add 
code to the examples to illustrate usage.

## How was this patch tested?

Doc and example change only. Build HTML doc locally and verified example code 
builds, and runs in shell for Scala/Python.

Author: Nick Pentreath 

Closes #17102 from MLnick/SPARK-19345-coldstart-doc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cca3dbf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cca3dbf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cca3dbf

Branch: refs/heads/master
Commit: 9cca3dbf4add9004a769dee1a556987e37230294
Parents: 50c08e8
Author: Nick Pentreath 
Authored: Thu Mar 2 15:51:16 2017 +0200
Committer: Nick Pentreath 
Committed: Thu Mar 2 15:51:16 2017 +0200

--
 docs/ml-collaborative-filtering.md  | 28 
 .../spark/examples/ml/JavaALSExample.java   |  2 ++
 examples/src/main/python/ml/als_example.py  |  4 ++-
 .../apache/spark/examples/ml/ALSExample.scala   |  2 ++
 4 files changed, 35 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9cca3dbf/docs/ml-collaborative-filtering.md
--
diff --git a/docs/ml-collaborative-filtering.md 
b/docs/ml-collaborative-filtering.md
index cfe8351..58f2d4b 100644
--- a/docs/ml-collaborative-filtering.md
+++ b/docs/ml-collaborative-filtering.md
@@ -59,6 +59,34 @@ This approach is named "ALS-WR" and discussed in the paper
 It makes `regParam` less dependent on the scale of the dataset, so we can 
apply the
 best parameter learned from a sampled subset to the full dataset and expect 
similar performance.
 
+### Cold-start strategy
+
+When making predictions using an `ALSModel`, it is common to encounter users 
and/or items in the 
+test dataset that were not present during training the model. This typically 
occurs in two 
+scenarios:
+
+1. In production, for new users or items that have no rating history and on 
which the model has not 
+been trained (this is the "cold start problem").
+2. During cross-validation, the data is split between training and evaluation 
sets. When using 
+simple random splits as in Spark's `CrossValidator` or `TrainValidationSplit`, 
it is actually 
+very common to encounter users and/or items in the evaluation set that are not 
in the training set
+
+By default, Spark assigns `NaN` predictions during `ALSModel.transform` when a 
user and/or item 
+factor is not present in the model. This can be useful in a production system, 
since it indicates 
+a new user or item, and so the system can make a decision on some fallback to 
use as the prediction.
+
+However, this is undesirable during cross-validation, since any `NaN` 
predicted values will result
+in `NaN` results for the evaluation metric (for example when using 
`RegressionEvaluator`).
+This makes model selection impossible.
+
+Spark allows users to set the `coldStartStrategy` parameter
+to "drop" in order to drop any rows in the `DataFrame` of predictions that 
contain `NaN` values. 
+The evaluation metric will then be computed over the non-`NaN` data and will 
be valid. 
+Usage of this parameter is illustrated in the example below.
+
+**Note:** currently the supported cold start strategies are "nan" (the default 
behavior mentioned 
+above) and "drop". Further strategies may be supported in future.
+
 **Examples**
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9cca3dbf/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index 33ba668..81970b7 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -103,6 +103,8 @@ public class JavaALSExample {
 ALSModel model = als.fit(training);
 
 // Evaluate the model by computing the RMSE on the test data
+// Note we set cold start strategy to 'drop' to ensure we don't get NaN 
evaluation metrics
+model.setColdStartStrategy("drop");
 Dataset predictions = model.transform(test);
 
 RegressionEvaluator evaluator = new RegressionEvaluator()


spark git commit: [SPARK-19766][SQL][BRANCH-2.0] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule

2017-03-02 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c9c45d97b -> e30fe1c6a


[SPARK-19766][SQL][BRANCH-2.0] Constant alias columns in INNER JOIN should not 
be folded by FoldablePropagation rule

This PR fix for branch-2.0

Refer #17099

gatorsmile

Author: Stan Zhai 

Closes #17131 from stanzhai/fix-inner-join-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e30fe1c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e30fe1c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e30fe1c6

Branch: refs/heads/branch-2.0
Commit: e30fe1c6aa91f74cf3bed74f3be3eb69a6eaf1b4
Parents: c9c45d9
Author: Stan Zhai 
Authored: Thu Mar 2 04:24:43 2017 -0800
Committer: Herman van Hovell 
Committed: Thu Mar 2 04:24:43 2017 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  2 +-
 .../optimizer/FoldablePropagationSuite.scala| 14 
 .../resources/sql-tests/inputs/inner-join.sql   | 17 +
 .../sql-tests/results/inner-join.sql.out| 67 
 4 files changed, 99 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e30fe1c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 3a71463..f3cf1f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -669,7 +669,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
 // join is not always picked from its children, but can also be null.
 // TODO(cloud-fan): It seems more reasonable to use new attributes as 
the output attributes
 // of outer join.
-case j @ Join(_, _, Inner, _) =>
+case j @ Join(_, _, Inner, _) if !stop =>
   j.transformExpressions(replaceFoldable)
 
 // We can fold the projections an expand holds. However expand changes 
the output columns

http://git-wip-us.apache.org/repos/asf/spark/blob/e30fe1c6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
index bbef212..9d18827 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
@@ -131,6 +131,20 @@ class FoldablePropagationSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("Propagate in inner join") {
+val ta = testRelation.select('a, Literal(1).as('tag))
+  .union(testRelation.select('a, Literal(2).as('tag)))
+  .subquery('ta)
+val tb = testRelation.select('a, Literal(1).as('tag))
+  .union(testRelation.select('a, Literal(2).as('tag)))
+  .subquery('tb)
+val query = ta.join(tb, Inner,
+  Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr))
+val optimized = Optimize.execute(query.analyze)
+val correctAnswer = query.analyze
+comparePlans(optimized, correctAnswer)
+  }
+
   test("Propagate in expand") {
 val c1 = Literal(1).as('a)
 val c2 = Literal(2).as('b)

http://git-wip-us.apache.org/repos/asf/spark/blob/e30fe1c6/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql 
b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql
new file mode 100644
index 000..38739cb
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql
@@ -0,0 +1,17 @@
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a);
+
+CREATE TEMPORARY VIEW ta AS
+SELECT a, 'a' AS tag FROM t1
+UNION ALL
+SELECT a, 'b' AS tag FROM t2;
+
+CREATE TEMPORARY VIEW tb AS
+SELECT a, 'a' AS tag FROM t3
+UNION ALL
+SELECT a, 'b' AS tag FROM t4;
+
+-- SPARK-19766 Constant alias columns in INNER JOIN should not be folded by 

spark git commit: [SPARK-19704][ML] AFTSurvivalRegression should support numeric censorCol

2017-03-02 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master 625cfe09e -> 50c08e82f


[SPARK-19704][ML] AFTSurvivalRegression should support numeric censorCol

## What changes were proposed in this pull request?
make `AFTSurvivalRegression` support numeric censorCol
## How was this patch tested?
existing tests and added tests

Author: Zheng RuiFeng 

Closes #17034 from zhengruifeng/aft_numeric_censor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50c08e82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50c08e82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50c08e82

Branch: refs/heads/master
Commit: 50c08e82f011dd31b4ff7ff2b45fb9fb4c0e3231
Parents: 625cfe0
Author: Zheng RuiFeng 
Authored: Thu Mar 2 13:34:04 2017 +0200
Committer: Nick Pentreath 
Committed: Thu Mar 2 13:34:04 2017 +0200

--
 .../ml/regression/AFTSurvivalRegression.scala   |  6 ++--
 .../ml/regression/IsotonicRegression.scala  |  2 +-
 .../regression/AFTSurvivalRegressionSuite.scala | 34 +++-
 3 files changed, 37 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/50c08e82/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 2f78dd3..4b36083 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -106,7 +106,7 @@ private[regression] trait AFTSurvivalRegressionParams 
extends Params
   fitting: Boolean): StructType = {
 SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
 if (fitting) {
-  SchemaUtils.checkColumnType(schema, $(censorCol), DoubleType)
+  SchemaUtils.checkNumericType(schema, $(censorCol))
   SchemaUtils.checkNumericType(schema, $(labelCol))
 }
 if (hasQuantilesCol) {
@@ -200,8 +200,8 @@ class AFTSurvivalRegression @Since("1.6.0") 
(@Since("1.6.0") override val uid: S
* and put it in an RDD with strong types.
*/
   protected[ml] def extractAFTPoints(dataset: Dataset[_]): RDD[AFTPoint] = {
-dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType), 
col($(censorCol)))
-  .rdd.map {
+dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType),
+  col($(censorCol)).cast(DoubleType)).rdd.map {
 case Row(features: Vector, label: Double, censor: Double) =>
   AFTPoint(features, label, censor)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/50c08e82/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index a6c2943..529f66e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -49,7 +49,7 @@ private[regression] trait IsotonicRegressionBase extends 
Params with HasFeatures
*/
   final val isotonic: BooleanParam =
 new BooleanParam(this, "isotonic",
-  "whether the output sequence should be isotonic/increasing (true) or" +
+  "whether the output sequence should be isotonic/increasing (true) or " +
 "antitonic/decreasing (false)")
 
   /** @group getParam */

http://git-wip-us.apache.org/repos/asf/spark/blob/50c08e82/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
index 0fdfdf3..3cd4b0a 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.random.{ExponentialGenerator, WeibullGenerator}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.types._
 
 class AFTSurvivalRegressionSuite
   extends SparkFunSuite with MLlibTestSparkContext 

spark git commit: [SPARK-19733][ML] Removed unnecessary castings and refactored checked casts in ALS.

2017-03-02 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master 8d6ef895e -> 625cfe09e


[SPARK-19733][ML] Removed unnecessary castings and refactored checked casts in 
ALS.

## What changes were proposed in this pull request?

The original ALS was performing unnecessary casting to the user and item ids 
because the protected checkedCast() method required a double. I removed the 
castings and refactored the method to receive Any and efficiently handle all 
permitted numeric values.

## How was this patch tested?

I tested it by running the unit-tests and by manually validating the result of 
checkedCast for various legal and illegal values.

Author: Vasilis Vryniotis 

Closes #17059 from datumbox/als_casting_fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/625cfe09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/625cfe09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/625cfe09

Branch: refs/heads/master
Commit: 625cfe09e673bfcb95e361ce19b534cf0a3c782c
Parents: 8d6ef89
Author: Vasilis Vryniotis 
Authored: Thu Mar 2 12:37:42 2017 +0200
Committer: Nick Pentreath 
Committed: Thu Mar 2 12:37:42 2017 +0200

--
 .../apache/spark/ml/recommendation/ALS.scala| 31 +---
 .../spark/ml/recommendation/ALSSuite.scala  | 84 +---
 2 files changed, 95 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/625cfe09/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 04273a4..799e881 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -80,14 +80,24 @@ private[recommendation] trait ALSModelParams extends Params 
with HasPredictionCo
 
   /**
* Attempts to safely cast a user/item id to an Int. Throws an exception if 
the value is
-   * out of integer range.
+   * out of integer range or contains a fractional part.
*/
-  protected val checkedCast = udf { (n: Double) =>
-if (n > Int.MaxValue || n < Int.MinValue) {
-  throw new IllegalArgumentException(s"ALS only supports values in Integer 
range for columns " +
-s"${$(userCol)} and ${$(itemCol)}. Value $n was out of Integer range.")
-} else {
-  n.toInt
+  protected[recommendation] val checkedCast = udf { (n: Any) =>
+n match {
+  case v: Int => v // Avoid unnecessary casting
+  case v: Number =>
+val intV = v.intValue
+// Checks if number within Int range and has no fractional part.
+if (v.doubleValue == intV) {
+  intV
+} else {
+  throw new IllegalArgumentException(s"ALS only supports values in 
Integer range " +
+s"and without fractional part for columns ${$(userCol)} and 
${$(itemCol)}. " +
+s"Value $n was either out of Integer range or contained a 
fractional part that " +
+s"could not be converted.")
+}
+  case _ => throw new IllegalArgumentException(s"ALS only supports values 
in Integer range " +
+s"for columns ${$(userCol)} and ${$(itemCol)}. Value $n was not 
numeric.")
 }
   }
 
@@ -288,9 +298,9 @@ class ALSModel private[ml] (
 }
 val predictions = dataset
   .join(userFactors,
-checkedCast(dataset($(userCol)).cast(DoubleType)) === 
userFactors("id"), "left")
+checkedCast(dataset($(userCol))) === userFactors("id"), "left")
   .join(itemFactors,
-checkedCast(dataset($(itemCol)).cast(DoubleType)) === 
itemFactors("id"), "left")
+checkedCast(dataset($(itemCol))) === itemFactors("id"), "left")
   .select(dataset("*"),
 predict(userFactors("features"), 
itemFactors("features")).as($(predictionCol)))
 getColdStartStrategy match {
@@ -491,8 +501,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends 
Estimator[ALSModel]
 
 val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else 
lit(1.0f)
 val ratings = dataset
-  .select(checkedCast(col($(userCol)).cast(DoubleType)),
-checkedCast(col($(itemCol)).cast(DoubleType)), r)
+  .select(checkedCast(col($(userCol))), checkedCast(col($(itemCol))), r)
   .rdd
   .map { row =>
 Rating(row.getInt(0), row.getInt(1), row.getFloat(2))

http://git-wip-us.apache.org/repos/asf/spark/blob/625cfe09/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
--
diff --git 

spark git commit: [SPARK-18352][DOCS] wholeFile JSON update doc and programming guide

2017-03-02 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master d2a879762 -> 8d6ef895e


[SPARK-18352][DOCS] wholeFile JSON update doc and programming guide

## What changes were proposed in this pull request?

Update doc for R, programming guide. Clarify default behavior for all languages.

## How was this patch tested?

manually

Author: Felix Cheung 

Closes #17128 from felixcheung/jsonwholefiledoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d6ef895
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d6ef895
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d6ef895

Branch: refs/heads/master
Commit: 8d6ef895ee492b8febbaac7ab2ef2c907b48fa4a
Parents: d2a8797
Author: Felix Cheung 
Authored: Thu Mar 2 01:02:38 2017 -0800
Committer: Felix Cheung 
Committed: Thu Mar 2 01:02:38 2017 -0800

--
 R/pkg/R/SQLContext.R| 10 +---
 docs/sql-programming-guide.md   | 26 +++-
 python/pyspark/sql/readwriter.py|  4 +--
 python/pyspark/sql/streaming.py |  4 +--
 .../org/apache/spark/sql/DataFrameReader.scala  |  4 +--
 .../spark/sql/streaming/DataStreamReader.scala  |  4 +--
 6 files changed, 30 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8d6ef895/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index e771a05..8354f70 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -332,8 +332,10 @@ setMethod("toDF", signature(x = "RDD"),
 
 #' Create a SparkDataFrame from a JSON file.
 #'
-#' Loads a JSON file (\href{http://jsonlines.org/}{JSON Lines text format or 
newline-delimited JSON}
-#' ), returning the result as a SparkDataFrame
+#' Loads a JSON file, returning the result as a SparkDataFrame
+#' By default, (\href{http://jsonlines.org/}{JSON Lines text format or 
newline-delimited JSON}
+#' ) is supported. For JSON (one record per file), set a named property 
\code{wholeFile} to
+#' \code{TRUE}.
 #' It goes through the entire dataset once to determine the schema.
 #'
 #' @param path Path of file to read. A vector of multiple paths is allowed.
@@ -346,6 +348,7 @@ setMethod("toDF", signature(x = "RDD"),
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
+#' df <- read.json(path, wholeFile = TRUE)
 #' df <- jsonFile(path)
 #' }
 #' @name read.json
@@ -778,6 +781,7 @@ dropTempView <- function(viewName) {
 #' @return SparkDataFrame
 #' @rdname read.df
 #' @name read.df
+#' @seealso \link{read.json}
 #' @export
 #' @examples
 #'\dontrun{
@@ -785,7 +789,7 @@ dropTempView <- function(viewName) {
 #' df1 <- read.df("path/to/file.json", source = "json")
 #' schema <- structType(structField("name", "string"),
 #'  structField("info", "map"))
-#' df2 <- read.df(mapTypeJsonPath, "json", schema)
+#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE)
 #' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
 #' }
 #' @name read.df

http://git-wip-us.apache.org/repos/asf/spark/blob/8d6ef895/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 2dd1ab6..b077575 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -386,8 +386,8 @@ For example:
 
 The [built-in DataFrames 
functions](api/scala/index.html#org.apache.spark.sql.functions$) provide common
 aggregations such as `count()`, `countDistinct()`, `avg()`, `max()`, `min()`, 
etc.
-While those functions are designed for DataFrames, Spark SQL also has 
type-safe versions for some of them in 
-[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$)
 and 
+While those functions are designed for DataFrames, Spark SQL also has 
type-safe versions for some of them in
+[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$)
 and
 [Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to work 
with strongly typed Datasets.
 Moreover, users are not limited to the predefined aggregate functions and can 
create their own.
 
@@ -397,7 +397,7 @@ Moreover, users are not limited to the predefined aggregate 
functions and can cr
 
 
 
-Users have to extend the 
[UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction)
 
+Users have to extend the 
[UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction)
 abstract class to implement a custom untyped