spark git commit: [SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice
Repository: spark Updated Branches: refs/heads/master e24f21b5f -> 982f3223b [SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice ## What changes were proposed in this pull request? Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles` twice in `InMemoryFileIndex` during `resolveRelation`. This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice. But there is a bug in `InMemoryFileIndex` see: [SPARK-19748](https://github.com/apache/spark/pull/17079) [SPARK-19761](https://github.com/apache/spark/pull/17093), so this pr should be after SPARK-19748/ SPARK-19761. ## How was this patch tested? unit test added Author: windpigerCloses #17081 from windpiger/resolveDataSourceScanFilesTwice. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/982f3223 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/982f3223 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/982f3223 Branch: refs/heads/master Commit: 982f3223b4f55f988091402063fe8746c5e2cee4 Parents: e24f21b Author: windpiger Authored: Thu Mar 2 23:54:01 2017 -0800 Committer: Wenchen Fan Committed: Thu Mar 2 23:54:01 2017 -0800 -- .../spark/sql/execution/datasources/DataSource.scala | 13 + .../sql/hive/PartitionedTablePerfStatsSuite.scala | 11 +++ 2 files changed, 20 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c1353d4..4947dfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -106,10 +106,13 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource + * @param fileStatusCache the shared cache for file statuses to speed up listing * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ - private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { + private def getOrInferFileFormatSchema( + format: FileFormat, + fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = { // the operations below are expensive therefore try not to do them if we don't need to, e.g., // in streaming mode, we have already inferred and registered partition columns, we will // never have to materialize the lazy val below @@ -122,7 +125,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - new InMemoryFileIndex(sparkSession, globbedPaths, options, None) + new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning @@ -354,7 +357,8 @@ case class DataSource( globPath }.toArray -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) +val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) +val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -364,7 +368,8 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) + new InMemoryFileIndex( +sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) } HadoopFsRelation( http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala -- diff --git
spark git commit: [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
Repository: spark Updated Branches: refs/heads/branch-2.1 3a7591ad5 -> 1237aaea2 [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifengCloses #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b5f8365ed25346e986748b393e0b4be25c) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1237aaea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1237aaea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1237aaea Branch: refs/heads/branch-2.1 Commit: 1237aaea279d6aac504ae1e3265c0b53779b5303 Parents: 3a7591a Author: guifeng Authored: Thu Mar 2 21:19:29 2017 -0800 Committer: Shixiong Zhu Committed: Thu Mar 2 21:19:40 2017 -0800 -- .../streaming/state/HDFSBackedStateStoreProvider.scala| 4 +++- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++ 2 files changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 2d29940..ab1204a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider( // semantically correct because Structured Streaming requires rerunning a batch should // generate the same output. (SPARK-19677) // scalastyle:on - if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { + if (fs.exists(finalDeltaFile)) { +fs.delete(tempDeltaFile, true) + } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") } loadedMaps.put(newVersion, map) http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 21a0a10..255378c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{File, IOException} import java.net.URI +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val provider = newStoreProvider(hadoopConf = conf) provider.getStore(0).commit() provider.getStore(0).commit() + +// Verify we don't leak temp files +val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation), + null, true).asScala.filter(_.getName.startsWith("temp-")) +assert(tempFiles.isEmpty) } test("corrupted file handling") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
Repository: spark Updated Branches: refs/heads/branch-2.0 491b47a16 -> 73801880f [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifengCloses #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b5f8365ed25346e986748b393e0b4be25c) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73801880 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73801880 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73801880 Branch: refs/heads/branch-2.0 Commit: 73801880fcc79b611e796dd0804b8b6df12da4e9 Parents: 491b47a Author: guifeng Authored: Thu Mar 2 21:19:29 2017 -0800 Committer: Shixiong Zhu Committed: Thu Mar 2 21:19:49 2017 -0800 -- .../streaming/state/HDFSBackedStateStoreProvider.scala| 4 +++- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++ 2 files changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73801880/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index f234ca6..759b1c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -263,7 +263,9 @@ private[state] class HDFSBackedStateStoreProvider( // semantically correct because Structured Streaming requires rerunning a batch should // generate the same output. (SPARK-19677) // scalastyle:on - if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { + if (fs.exists(finalDeltaFile)) { +fs.delete(tempDeltaFile, true) + } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") } loadedMaps.put(newVersion, map) http://git-wip-us.apache.org/repos/asf/spark/blob/73801880/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index b5f161c..e375604 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{File, IOException} import java.net.URI +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val provider = newStoreProvider(hadoopConf = conf) provider.getStore(0).commit() provider.getStore(0).commit() + +// Verify we don't leak temp files +val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation), + null, true).asScala.filter(_.getName.startsWith("temp-")) +assert(tempFiles.isEmpty) } test("corrupted file handling") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19602][SQL][TESTS] Add tests for qualified column names
Repository: spark Updated Branches: refs/heads/master 93ae176e8 -> f37bb1430 [SPARK-19602][SQL][TESTS] Add tests for qualified column names ## What changes were proposed in this pull request? - Add tests covering different scenarios with qualified column names - Please see Section 2 in the design doc for the various test scenarios [here](https://issues.apache.org/jira/secure/attachment/12854681/Design_ColResolution_JIRA19602.pdf) - As part of SPARK-19602, changes are made to support three part column name. In order to aid in the review and to reduce the diff, the test scenarios are separated out into this PR. ## How was this patch tested? - This is a **test only** change. The individual test suites were run successfully. Author: Sunitha KambhampatiCloses #17067 from skambha/colResolutionTests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f37bb143 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f37bb143 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f37bb143 Branch: refs/heads/master Commit: f37bb143022ea10107877c80c5c73bd77aeda7ff Parents: 93ae176 Author: Sunitha Kambhampati Authored: Thu Mar 2 21:19:22 2017 -0800 Committer: Xiao Li Committed: Thu Mar 2 21:19:22 2017 -0800 -- .../inputs/columnresolution-negative.sql| 36 ++ .../sql-tests/inputs/columnresolution-views.sql | 25 ++ .../sql-tests/inputs/columnresolution.sql | 88 .../results/columnresolution-negative.sql.out | 240 ++ .../results/columnresolution-views.sql.out | 140 ++ .../sql-tests/results/columnresolution.sql.out | 447 +++ .../sql-tests/results/inner-join.sql.out| 3 +- .../apache/spark/sql/SQLQueryTestSuite.scala| 6 +- 8 files changed, 980 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f37bb143/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql new file mode 100644 index 000..1caa45c --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-negative.sql @@ -0,0 +1,36 @@ +-- Negative testcases for column resolution +CREATE DATABASE mydb1; +USE mydb1; +CREATE TABLE t1 USING parquet AS SELECT 1 AS i1; + +CREATE DATABASE mydb2; +USE mydb2; +CREATE TABLE t1 USING parquet AS SELECT 20 AS i1; + +-- Negative tests: column resolution scenarios with ambiguous cases in join queries +SET spark.sql.crossJoin.enabled = true; +USE mydb1; +SELECT i1 FROM t1, mydb1.t1; +SELECT t1.i1 FROM t1, mydb1.t1; +SELECT mydb1.t1.i1 FROM t1, mydb1.t1; +SELECT i1 FROM t1, mydb2.t1; +SELECT t1.i1 FROM t1, mydb2.t1; +USE mydb2; +SELECT i1 FROM t1, mydb1.t1; +SELECT t1.i1 FROM t1, mydb1.t1; +SELECT i1 FROM t1, mydb2.t1; +SELECT t1.i1 FROM t1, mydb2.t1; +SELECT db1.t1.i1 FROM t1, mydb2.t1; +SET spark.sql.crossJoin.enabled = false; + +-- Negative tests +USE mydb1; +SELECT mydb1.t1 FROM t1; +SELECT t1.x.y.* FROM t1; +SELECT t1 FROM mydb1.t1; +USE mydb2; +SELECT mydb1.t1.i1 FROM t1; + +-- reset +DROP DATABASE mydb1 CASCADE; +DROP DATABASE mydb2 CASCADE; http://git-wip-us.apache.org/repos/asf/spark/blob/f37bb143/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql new file mode 100644 index 000..d3f9287 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/columnresolution-views.sql @@ -0,0 +1,25 @@ +-- Tests for qualified column names for the view code-path +-- Test scenario with Temporary view +CREATE OR REPLACE TEMPORARY VIEW view1 AS SELECT 2 AS i1; +SELECT view1.* FROM view1; +SELECT * FROM view1; +SELECT view1.i1 FROM view1; +SELECT i1 FROM view1; +SELECT a.i1 FROM view1 AS a; +SELECT i1 FROM view1 AS a; +-- cleanup +DROP VIEW view1; + +-- Test scenario with Global Temp view +CREATE OR REPLACE GLOBAL TEMPORARY VIEW view1 as SELECT 1 as i1; +SELECT * FROM global_temp.view1; +-- TODO: Support this scenario +SELECT global_temp.view1.* FROM global_temp.view1; +SELECT i1 FROM global_temp.view1; +-- TODO: Support this scenario +SELECT global_temp.view1.i1 FROM global_temp.view1; +SELECT view1.i1 FROM global_temp.view1; +SELECT a.i1 FROM global_temp.view1 AS a; +SELECT i1 FROM global_temp.view1 AS a; +-- cleanup +DROP VIEW global_temp.view1;
spark git commit: [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
Repository: spark Updated Branches: refs/heads/master f37bb1430 -> e24f21b5f [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifengCloses #17124 from gf53520/SPARK-19779. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e24f21b5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e24f21b5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e24f21b5 Branch: refs/heads/master Commit: e24f21b5f8365ed25346e986748b393e0b4be25c Parents: f37bb14 Author: guifeng Authored: Thu Mar 2 21:19:29 2017 -0800 Committer: Shixiong Zhu Committed: Thu Mar 2 21:19:29 2017 -0800 -- .../streaming/state/HDFSBackedStateStoreProvider.scala| 4 +++- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++ 2 files changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e24f21b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 2d29940..ab1204a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider( // semantically correct because Structured Streaming requires rerunning a batch should // generate the same output. (SPARK-19677) // scalastyle:on - if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { + if (fs.exists(finalDeltaFile)) { +fs.delete(tempDeltaFile, true) + } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") } loadedMaps.put(newVersion, map) http://git-wip-us.apache.org/repos/asf/spark/blob/e24f21b5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index dc4e935..e848f74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{File, IOException} import java.net.URI +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val provider = newStoreProvider(hadoopConf = conf) provider.getStore(0).commit() provider.getStore(0).commit() + +// Verify we don't leak temp files +val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation), + null, true).asScala.filter(_.getName.startsWith("temp-")) +assert(tempFiles.isEmpty) } test("corrupted file handling") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19745][ML] SVCAggregator captures coefficients in its closure
Repository: spark Updated Branches: refs/heads/master 8417a7ae6 -> 93ae176e8 [SPARK-19745][ML] SVCAggregator captures coefficients in its closure ## What changes were proposed in this pull request? JIRA: [SPARK-19745](https://issues.apache.org/jira/browse/SPARK-19745) Reorganize SVCAggregator to avoid serializing coefficients. This patch also makes the gradient array a `lazy val` which will avoid materializing a large array on the driver before shipping the class to the executors. This improvement stems from https://github.com/apache/spark/pull/16037. Actually, probably all ML aggregators can benefit from this. We can either: a.) separate the gradient improvement into another patch b.) keep what's here _plus_ add the lazy evaluation to all other aggregators in this patch or c.) keep it as is. ## How was this patch tested? This is an interesting question! I don't know of a reasonable way to test this right now. Ideally, we could perform an optimization and look at the shuffle write data for each task, and we could compare the size to what it we know it should be: `numCoefficients * 8 bytes`. Not sure if there is a good way to do that right now? We could discuss this here or in another JIRA, but I suspect it would be a significant undertaking. Author: sethahCloses #17076 from sethah/svc_agg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ae176e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ae176e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ae176e Branch: refs/heads/master Commit: 93ae176e8943d6b346c80deea778bffd188366a1 Parents: 8417a7a Author: sethah Authored: Thu Mar 2 19:38:25 2017 -0800 Committer: Yanbo Liang Committed: Thu Mar 2 19:38:25 2017 -0800 -- .../spark/ml/classification/LinearSVC.scala | 29 .../ml/classification/LogisticRegression.scala | 2 +- .../spark/ml/clustering/GaussianMixture.scala | 6 ++-- .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../spark/ml/regression/LinearRegression.scala | 2 +- .../ml/classification/LinearSVCSuite.scala | 17 +++- 6 files changed, 34 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93ae176e/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index bf6e76d..f76b14e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -440,19 +440,14 @@ private class LinearSVCAggregator( private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - private val coefficients: Vector = bcCoefficients.value private var weightSum: Double = 0.0 private var lossSum: Double = 0.0 - require(numFeaturesPlusIntercept == coefficients.size, s"Dimension mismatch. Coefficients " + -s"length ${coefficients.size}, FeaturesStd length ${numFeatures}, fitIntercept: $fitIntercept") - - private val coefficientsArray = coefficients match { -case dv: DenseVector => dv.values -case _ => - throw new IllegalArgumentException( -s"coefficients only supports dense vector but got type ${coefficients.getClass}.") + @transient private lazy val coefficientsArray = bcCoefficients.value match { +case DenseVector(values) => values +case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") } - private val gradientSumArray = Array.fill[Double](coefficientsArray.length)(0) + private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept) /** * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient @@ -463,6 +458,9 @@ private class LinearSVCAggregator( */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + +s" Expecting $numFeatures but got ${features.size}.") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray @@ -530,18 +528,15 @@ private class LinearSVCAggregator( this } - def loss:
spark git commit: [SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https
Repository: spark Updated Branches: refs/heads/branch-2.0 e30fe1c6a -> 491b47a16 [SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https ## What changes were proposed in this pull request? If spark ui port (4040) is not set, it will choose port number 0, this will make https port to also choose 0. And in Spark 2.1 code, it will use this https port (0) to do redirect, so when redirect triggered, it will point to a wrong url: like: ``` /tmp/temp$ wget http://172.27.25.134:55015 --2017-02-23 12:13:54-- http://172.27.25.134:55015/ Connecting to 172.27.25.134:55015... connected. HTTP request sent, awaiting response... 302 Found Location: https://172.27.25.134:0/ [following] --2017-02-23 12:13:54-- https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. --2017-02-23 12:13:55-- (try: 2) https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. --2017-02-23 12:13:57-- (try: 3) https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. --2017-02-23 12:14:00-- (try: 4) https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. ``` So instead of using 0 to do redirect, we should pick a bound port instead. This issue only exists in Spark 2.1-, and can be reproduced in yarn cluster mode. ## How was this patch tested? Current redirect UT doesn't verify this issue, so extend current UT to do correct verification. Author: jerryshaoCloses #17083 from jerryshao/SPARK-19750. (cherry picked from commit 3a7591ad5315308d24c0e444ce304ff78aef2304) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/491b47a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/491b47a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/491b47a1 Branch: refs/heads/branch-2.0 Commit: 491b47a162030cb0dbb81ae32898e109707a4709 Parents: e30fe1c Author: jerryshao Authored: Thu Mar 2 17:18:52 2017 -0800 Committer: Marcelo Vanzin Committed: Thu Mar 2 17:19:04 2017 -0800 -- .../main/scala/org/apache/spark/TestUtils.scala | 23 .../scala/org/apache/spark/ui/JettyUtils.scala | 10 + .../scala/org/apache/spark/ui/UISuite.scala | 6 - 3 files changed, 30 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/491b47a1/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index a76395b..3bf1d31 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -27,6 +27,7 @@ import java.util.Arrays import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import javax.net.ssl._ +import javax.servlet.http.HttpServletResponse import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} import scala.collection.JavaConverters._ @@ -186,12 +187,12 @@ private[spark] object TestUtils { } /** - * Returns the response code from an HTTP(S) URL. + * Returns the response code and url (if redirected) from an HTTP(S) URL. */ - def httpResponseCode( + def httpResponseCodeAndURL( url: URL, method: String = "GET", - headers: Seq[(String, String)] = Nil): Int = { + headers: Seq[(String, String)] = Nil): (Int, Option[String]) = { val connection = url.openConnection().asInstanceOf[HttpURLConnection] connection.setRequestMethod(method) headers.foreach { case (k, v) => connection.setRequestProperty(k, v) } @@ -210,16 +211,30 @@ private[spark] object TestUtils { sslCtx.init(null, Array(trustManager), new SecureRandom()) connection.asInstanceOf[HttpsURLConnection].setSSLSocketFactory(sslCtx.getSocketFactory()) connection.asInstanceOf[HttpsURLConnection].setHostnameVerifier(verifier) + connection.setInstanceFollowRedirects(false) } try { connection.connect() - connection.getResponseCode() + if (connection.getResponseCode == HttpServletResponse.SC_FOUND) { +(connection.getResponseCode, Option(connection.getHeaderField("Location"))) + } else { +(connection.getResponseCode(), None) + } } finally { connection.disconnect() } } + /** + * Returns the response code from an HTTP(S) URL. + */ + def httpResponseCode( + url: URL, + method: String = "GET", + headers:
spark git commit: [SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https
Repository: spark Updated Branches: refs/heads/branch-2.1 27347b5f2 -> 3a7591ad5 [SPARK-19750][UI][BRANCH-2.1] Fix redirect issue from http to https ## What changes were proposed in this pull request? If spark ui port (4040) is not set, it will choose port number 0, this will make https port to also choose 0. And in Spark 2.1 code, it will use this https port (0) to do redirect, so when redirect triggered, it will point to a wrong url: like: ``` /tmp/temp$ wget http://172.27.25.134:55015 --2017-02-23 12:13:54-- http://172.27.25.134:55015/ Connecting to 172.27.25.134:55015... connected. HTTP request sent, awaiting response... 302 Found Location: https://172.27.25.134:0/ [following] --2017-02-23 12:13:54-- https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. --2017-02-23 12:13:55-- (try: 2) https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. --2017-02-23 12:13:57-- (try: 3) https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. --2017-02-23 12:14:00-- (try: 4) https://172.27.25.134:0/ Connecting to 172.27.25.134:0... failed: Can't assign requested address. Retrying. ``` So instead of using 0 to do redirect, we should pick a bound port instead. This issue only exists in Spark 2.1-, and can be reproduced in yarn cluster mode. ## How was this patch tested? Current redirect UT doesn't verify this issue, so extend current UT to do correct verification. Author: jerryshaoCloses #17083 from jerryshao/SPARK-19750. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a7591ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a7591ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a7591ad Branch: refs/heads/branch-2.1 Commit: 3a7591ad5315308d24c0e444ce304ff78aef2304 Parents: 27347b5 Author: jerryshao Authored: Thu Mar 2 17:18:52 2017 -0800 Committer: Marcelo Vanzin Committed: Thu Mar 2 17:18:52 2017 -0800 -- .../main/scala/org/apache/spark/TestUtils.scala | 23 .../scala/org/apache/spark/ui/JettyUtils.scala | 10 + .../scala/org/apache/spark/ui/UISuite.scala | 6 - 3 files changed, 30 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a7591ad/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index c3ccdb0..5cdc4ee 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -27,6 +27,7 @@ import java.util.Arrays import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import javax.net.ssl._ +import javax.servlet.http.HttpServletResponse import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} import scala.collection.JavaConverters._ @@ -186,12 +187,12 @@ private[spark] object TestUtils { } /** - * Returns the response code from an HTTP(S) URL. + * Returns the response code and url (if redirected) from an HTTP(S) URL. */ - def httpResponseCode( + def httpResponseCodeAndURL( url: URL, method: String = "GET", - headers: Seq[(String, String)] = Nil): Int = { + headers: Seq[(String, String)] = Nil): (Int, Option[String]) = { val connection = url.openConnection().asInstanceOf[HttpURLConnection] connection.setRequestMethod(method) headers.foreach { case (k, v) => connection.setRequestProperty(k, v) } @@ -210,16 +211,30 @@ private[spark] object TestUtils { sslCtx.init(null, Array(trustManager), new SecureRandom()) connection.asInstanceOf[HttpsURLConnection].setSSLSocketFactory(sslCtx.getSocketFactory()) connection.asInstanceOf[HttpsURLConnection].setHostnameVerifier(verifier) + connection.setInstanceFollowRedirects(false) } try { connection.connect() - connection.getResponseCode() + if (connection.getResponseCode == HttpServletResponse.SC_FOUND) { +(connection.getResponseCode, Option(connection.getHeaderField("Location"))) + } else { +(connection.getResponseCode(), None) + } } finally { connection.disconnect() } } + /** + * Returns the response code from an HTTP(S) URL. + */ + def httpResponseCode( + url: URL, + method: String = "GET", + headers: Seq[(String, String)] = Nil): Int = { +httpResponseCodeAndURL(url, method, headers)._1 + } }
spark git commit: [SPARK-19276][CORE] Fetch Failure handling robust to user error handling
Repository: spark Updated Branches: refs/heads/master 433d9eb61 -> 8417a7ae6 [SPARK-19276][CORE] Fetch Failure handling robust to user error handling ## What changes were proposed in this pull request? Fault-tolerance in spark requires special handling of shuffle fetch failures. The Executor would catch FetchFailedException and send a special msg back to the driver. However, intervening user code could intercept that exception, and wrap it with something else. This even happens in SparkSQL. So rather than checking the thrown exception only, we'll store the fetch failure directly in the TaskContext, where users can't touch it. ## How was this patch tested? Added a test case which failed before the fix. Full test suite via jenkins. Author: Imran RashidCloses #16639 from squito/SPARK-19276. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8417a7ae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8417a7ae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8417a7ae Branch: refs/heads/master Commit: 8417a7ae6c0ea3fb8dc41bc492fc9513d1ad24af Parents: 433d9eb Author: Imran Rashid Authored: Thu Mar 2 16:46:01 2017 -0800 Committer: Kay Ousterhout Committed: Thu Mar 2 16:46:01 2017 -0800 -- .../scala/org/apache/spark/TaskContext.scala| 7 + .../org/apache/spark/TaskContextImpl.scala | 11 ++ .../org/apache/spark/executor/Executor.scala| 33 - .../scala/org/apache/spark/scheduler/Task.scala | 9 +- .../spark/shuffle/FetchFailedException.scala| 13 +- .../apache/spark/executor/ExecutorSuite.scala | 139 ++- project/MimaExcludes.scala | 3 + 7 files changed, 198 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8417a7ae/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 0fd777e..f0867ec 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} @@ -190,4 +191,10 @@ abstract class TaskContext extends Serializable { */ private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit + /** + * Record that this task has failed due to a fetch failure from a remote host. This allows + * fetch-failure handling to get triggered by the driver, regardless of intervening user-code. + */ + private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit + } http://git-wip-us.apache.org/repos/asf/spark/blob/8417a7ae/core/src/main/scala/org/apache/spark/TaskContextImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index c904e08..dc0d128 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util._ private[spark] class TaskContextImpl( @@ -56,6 +57,10 @@ private[spark] class TaskContextImpl( // Whether the task has failed. @volatile private var failed: Boolean = false + // If there was a fetch failure in the task, we store it here, to make sure user-code doesn't + // hide the exception. See SPARK-19276 + @volatile private var _fetchFailedException: Option[FetchFailedException] = None + override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = { onCompleteCallbacks += listener this @@ -126,4 +131,10 @@ private[spark] class TaskContextImpl( taskMetrics.registerAccumulator(a) } + private[spark] override def setFetchFailed(fetchFailed: FetchFailedException): Unit = { +this._fetchFailedException = Option(fetchFailed) + } + + private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException + }
spark git commit: [SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks
Repository: spark Updated Branches: refs/heads/master 5ae3516bf -> 433d9eb61 [SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks ## What changes were proposed in this pull request? Previously it was possible for there to be a race between a task failure and committing the output of a task. For example, the driver may mark a task attempt as failed due to an executor heartbeat timeout (possibly due to GC), but the task attempt actually ends up coordinating with the OutputCommitCoordinator once the executor recovers and committing its result. This will lead to any retry attempt failing because the task result has already been committed despite the original attempt failing. This ensures that any previously failed task attempts cannot enter the commit protocol. ## How was this patch tested? Added a unit test Author: Patrick WoodyCloses #16959 from pwoody/pw/recordFailuresForCommitter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/433d9eb6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/433d9eb6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/433d9eb6 Branch: refs/heads/master Commit: 433d9eb6151a547af967cc1ac983a789bed60704 Parents: 5ae3516 Author: Patrick Woody Authored: Thu Mar 2 15:55:32 2017 -0800 Committer: Kay Ousterhout Committed: Thu Mar 2 15:55:32 2017 -0800 -- .../scheduler/OutputCommitCoordinator.scala | 59 .../OutputCommitCoordinatorSuite.scala | 11 2 files changed, 46 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/433d9eb6/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 08d220b..83d87b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private type StageId = Int private type PartitionId = Int private type TaskAttemptNumber = Int - private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 + private case class StageState(numPartitions: Int) { +val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER) +val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() + } /** - * Map from active stages's id => partition id => task attempt with exclusive lock on committing - * output for that partition. + * Map from active stages's id => authorized task attempts for each partition id, which hold an + * exclusive lock on committing task output for that partition, as well as any known failed + * attempts in the stage. * * Entries are added to the top-level map when stages start and are removed they finish * (either successfully or unsuccessfully). * * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. */ - private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]() + private val stageStates = mutable.Map[StageId, StageState]() /** * Returns whether the OutputCommitCoordinator's internal data structures are all empty. */ def isEmpty: Boolean = { -authorizedCommittersByStage.isEmpty +stageStates.isEmpty } /** @@ -105,19 +109,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart( - stage: StageId, - maxPartitionId: Int): Unit = { -val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) -java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) -synchronized { - authorizedCommittersByStage(stage) = arr -} + private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { +stageStates(stage) = new StageState(maxPartitionId + 1) } // Called by DAGScheduler private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { -authorizedCommittersByStage.remove(stage) +stageStates.remove(stage) } // Called by DAGScheduler @@ -126,7 +124,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
spark git commit: [SPARK-19720][CORE] Redact sensitive information from SparkSubmit console
Repository: spark Updated Branches: refs/heads/master 9cca3dbf4 -> 5ae3516bf [SPARK-19720][CORE] Redact sensitive information from SparkSubmit console ## What changes were proposed in this pull request? This change redacts senstive information (based on `spark.redaction.regex` property) from the Spark Submit console logs. Such sensitive information is already being redacted from event logs and yarn logs, etc. ## How was this patch tested? Testing was done manually to make sure that the console logs were not printing any sensitive information. Here's some output from the console: ``` Spark properties used, including those specified through --conf and those from the properties file /etc/spark2/conf/spark-defaults.conf: (spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted)) (spark.authenticate,false) (spark.executorEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted)) ``` ``` System properties: (spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted)) (spark.authenticate,false) (spark.executorEnv.HADOOP_CREDSTORE_PASSWORD,*(redacted)) ``` There is a risk if new print statements were added to the console down the road, sensitive information may still get leaked, since there is no test that asserts on the console log output. I considered it out of the scope of this JIRA to write an integration test to make sure new leaks don't happen in the future. Running unit tests to make sure nothing else is broken by this change. Author: Mark GroverCloses #17047 from markgrover/master_redaction. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ae3516b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ae3516b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ae3516b Branch: refs/heads/master Commit: 5ae3516bfb7716f1793eb76b4fdc720b31829d07 Parents: 9cca3db Author: Mark Grover Authored: Thu Mar 2 10:33:56 2017 -0800 Committer: Marcelo Vanzin Committed: Thu Mar 2 10:33:56 2017 -0800 -- .../org/apache/spark/deploy/SparkSubmit.scala | 3 ++- .../spark/deploy/SparkSubmitArguments.scala | 12 --- .../scala/org/apache/spark/util/Utils.scala | 21 +++- 3 files changed, 31 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ae3516b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5ffdedd..1e50eb6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -665,7 +665,8 @@ object SparkSubmit extends CommandLineUtils { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") - printStream.println(s"System properties:\n${sysProps.mkString("\n")}") + // sysProps may contain sensitive information, so redact before printing + printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}") printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") printStream.println("\n") } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae3516b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index dee7734..0614d80 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -84,9 +84,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S // scalastyle:off println if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile") Option(propertiesFile).foreach { filename => - Utils.getPropertiesFromFile(filename).foreach { case (k, v) => + val properties = Utils.getPropertiesFromFile(filename) + properties.foreach { case (k, v) => defaultProperties(k) = v -if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") + } + // Property files may contain sensitive information, so redact before printing + if (verbose) { +Utils.redact(properties).foreach { case (k, v) => + SparkSubmit.printStream.println(s"Adding default property: $k=$v") +} } }
spark git commit: [SPARK-19345][ML][DOC] Add doc for "coldStartStrategy" usage in ALS
Repository: spark Updated Branches: refs/heads/master 50c08e82f -> 9cca3dbf4 [SPARK-19345][ML][DOC] Add doc for "coldStartStrategy" usage in ALS [SPARK-14489](https://issues.apache.org/jira/browse/SPARK-14489) added the ability to skip `NaN` predictions during `ALSModel.transform`. This PR adds documentation for the `coldStartStrategy` param to the ALS user guide, and add code to the examples to illustrate usage. ## How was this patch tested? Doc and example change only. Build HTML doc locally and verified example code builds, and runs in shell for Scala/Python. Author: Nick PentreathCloses #17102 from MLnick/SPARK-19345-coldstart-doc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cca3dbf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cca3dbf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cca3dbf Branch: refs/heads/master Commit: 9cca3dbf4add9004a769dee1a556987e37230294 Parents: 50c08e8 Author: Nick Pentreath Authored: Thu Mar 2 15:51:16 2017 +0200 Committer: Nick Pentreath Committed: Thu Mar 2 15:51:16 2017 +0200 -- docs/ml-collaborative-filtering.md | 28 .../spark/examples/ml/JavaALSExample.java | 2 ++ examples/src/main/python/ml/als_example.py | 4 ++- .../apache/spark/examples/ml/ALSExample.scala | 2 ++ 4 files changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cca3dbf/docs/ml-collaborative-filtering.md -- diff --git a/docs/ml-collaborative-filtering.md b/docs/ml-collaborative-filtering.md index cfe8351..58f2d4b 100644 --- a/docs/ml-collaborative-filtering.md +++ b/docs/ml-collaborative-filtering.md @@ -59,6 +59,34 @@ This approach is named "ALS-WR" and discussed in the paper It makes `regParam` less dependent on the scale of the dataset, so we can apply the best parameter learned from a sampled subset to the full dataset and expect similar performance. +### Cold-start strategy + +When making predictions using an `ALSModel`, it is common to encounter users and/or items in the +test dataset that were not present during training the model. This typically occurs in two +scenarios: + +1. In production, for new users or items that have no rating history and on which the model has not +been trained (this is the "cold start problem"). +2. During cross-validation, the data is split between training and evaluation sets. When using +simple random splits as in Spark's `CrossValidator` or `TrainValidationSplit`, it is actually +very common to encounter users and/or items in the evaluation set that are not in the training set + +By default, Spark assigns `NaN` predictions during `ALSModel.transform` when a user and/or item +factor is not present in the model. This can be useful in a production system, since it indicates +a new user or item, and so the system can make a decision on some fallback to use as the prediction. + +However, this is undesirable during cross-validation, since any `NaN` predicted values will result +in `NaN` results for the evaluation metric (for example when using `RegressionEvaluator`). +This makes model selection impossible. + +Spark allows users to set the `coldStartStrategy` parameter +to "drop" in order to drop any rows in the `DataFrame` of predictions that contain `NaN` values. +The evaluation metric will then be computed over the non-`NaN` data and will be valid. +Usage of this parameter is illustrated in the example below. + +**Note:** currently the supported cold start strategies are "nan" (the default behavior mentioned +above) and "drop". Further strategies may be supported in future. + **Examples** http://git-wip-us.apache.org/repos/asf/spark/blob/9cca3dbf/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java index 33ba668..81970b7 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java @@ -103,6 +103,8 @@ public class JavaALSExample { ALSModel model = als.fit(training); // Evaluate the model by computing the RMSE on the test data +// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics +model.setColdStartStrategy("drop"); Dataset predictions = model.transform(test); RegressionEvaluator evaluator = new RegressionEvaluator()
spark git commit: [SPARK-19766][SQL][BRANCH-2.0] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
Repository: spark Updated Branches: refs/heads/branch-2.0 c9c45d97b -> e30fe1c6a [SPARK-19766][SQL][BRANCH-2.0] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule This PR fix for branch-2.0 Refer #17099 gatorsmile Author: Stan ZhaiCloses #17131 from stanzhai/fix-inner-join-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e30fe1c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e30fe1c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e30fe1c6 Branch: refs/heads/branch-2.0 Commit: e30fe1c6aa91f74cf3bed74f3be3eb69a6eaf1b4 Parents: c9c45d9 Author: Stan Zhai Authored: Thu Mar 2 04:24:43 2017 -0800 Committer: Herman van Hovell Committed: Thu Mar 2 04:24:43 2017 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../optimizer/FoldablePropagationSuite.scala| 14 .../resources/sql-tests/inputs/inner-join.sql | 17 + .../sql-tests/results/inner-join.sql.out| 67 4 files changed, 99 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e30fe1c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3a71463..f3cf1f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -669,7 +669,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { // join is not always picked from its children, but can also be null. // TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes // of outer join. -case j @ Join(_, _, Inner, _) => +case j @ Join(_, _, Inner, _) if !stop => j.transformExpressions(replaceFoldable) // We can fold the projections an expand holds. However expand changes the output columns http://git-wip-us.apache.org/repos/asf/spark/blob/e30fe1c6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala index bbef212..9d18827 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala @@ -131,6 +131,20 @@ class FoldablePropagationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Propagate in inner join") { +val ta = testRelation.select('a, Literal(1).as('tag)) + .union(testRelation.select('a, Literal(2).as('tag))) + .subquery('ta) +val tb = testRelation.select('a, Literal(1).as('tag)) + .union(testRelation.select('a, Literal(2).as('tag))) + .subquery('tb) +val query = ta.join(tb, Inner, + Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr)) +val optimized = Optimize.execute(query.analyze) +val correctAnswer = query.analyze +comparePlans(optimized, correctAnswer) + } + test("Propagate in expand") { val c1 = Literal(1).as('a) val c2 = Literal(2).as('b) http://git-wip-us.apache.org/repos/asf/spark/blob/e30fe1c6/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql new file mode 100644 index 000..38739cb --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql @@ -0,0 +1,17 @@ +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a); + +CREATE TEMPORARY VIEW ta AS +SELECT a, 'a' AS tag FROM t1 +UNION ALL +SELECT a, 'b' AS tag FROM t2; + +CREATE TEMPORARY VIEW tb AS +SELECT a, 'a' AS tag FROM t3 +UNION ALL +SELECT a, 'b' AS tag FROM t4; + +-- SPARK-19766 Constant alias columns in INNER JOIN should not be folded by
spark git commit: [SPARK-19704][ML] AFTSurvivalRegression should support numeric censorCol
Repository: spark Updated Branches: refs/heads/master 625cfe09e -> 50c08e82f [SPARK-19704][ML] AFTSurvivalRegression should support numeric censorCol ## What changes were proposed in this pull request? make `AFTSurvivalRegression` support numeric censorCol ## How was this patch tested? existing tests and added tests Author: Zheng RuiFengCloses #17034 from zhengruifeng/aft_numeric_censor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50c08e82 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50c08e82 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50c08e82 Branch: refs/heads/master Commit: 50c08e82f011dd31b4ff7ff2b45fb9fb4c0e3231 Parents: 625cfe0 Author: Zheng RuiFeng Authored: Thu Mar 2 13:34:04 2017 +0200 Committer: Nick Pentreath Committed: Thu Mar 2 13:34:04 2017 +0200 -- .../ml/regression/AFTSurvivalRegression.scala | 6 ++-- .../ml/regression/IsotonicRegression.scala | 2 +- .../regression/AFTSurvivalRegressionSuite.scala | 34 +++- 3 files changed, 37 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/50c08e82/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 2f78dd3..4b36083 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -106,7 +106,7 @@ private[regression] trait AFTSurvivalRegressionParams extends Params fitting: Boolean): StructType = { SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) if (fitting) { - SchemaUtils.checkColumnType(schema, $(censorCol), DoubleType) + SchemaUtils.checkNumericType(schema, $(censorCol)) SchemaUtils.checkNumericType(schema, $(labelCol)) } if (hasQuantilesCol) { @@ -200,8 +200,8 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S * and put it in an RDD with strong types. */ protected[ml] def extractAFTPoints(dataset: Dataset[_]): RDD[AFTPoint] = { -dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType), col($(censorCol))) - .rdd.map { +dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType), + col($(censorCol)).cast(DoubleType)).rdd.map { case Row(features: Vector, label: Double, censor: Double) => AFTPoint(features, label, censor) } http://git-wip-us.apache.org/repos/asf/spark/blob/50c08e82/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index a6c2943..529f66e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -49,7 +49,7 @@ private[regression] trait IsotonicRegressionBase extends Params with HasFeatures */ final val isotonic: BooleanParam = new BooleanParam(this, "isotonic", - "whether the output sequence should be isotonic/increasing (true) or" + + "whether the output sequence should be isotonic/increasing (true) or " + "antitonic/decreasing (false)") /** @group getParam */ http://git-wip-us.apache.org/repos/asf/spark/blob/50c08e82/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index 0fdfdf3..3cd4b0a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.random.{ExponentialGenerator, WeibullGenerator} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types._ class AFTSurvivalRegressionSuite extends SparkFunSuite with MLlibTestSparkContext
spark git commit: [SPARK-19733][ML] Removed unnecessary castings and refactored checked casts in ALS.
Repository: spark Updated Branches: refs/heads/master 8d6ef895e -> 625cfe09e [SPARK-19733][ML] Removed unnecessary castings and refactored checked casts in ALS. ## What changes were proposed in this pull request? The original ALS was performing unnecessary casting to the user and item ids because the protected checkedCast() method required a double. I removed the castings and refactored the method to receive Any and efficiently handle all permitted numeric values. ## How was this patch tested? I tested it by running the unit-tests and by manually validating the result of checkedCast for various legal and illegal values. Author: Vasilis VryniotisCloses #17059 from datumbox/als_casting_fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/625cfe09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/625cfe09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/625cfe09 Branch: refs/heads/master Commit: 625cfe09e673bfcb95e361ce19b534cf0a3c782c Parents: 8d6ef89 Author: Vasilis Vryniotis Authored: Thu Mar 2 12:37:42 2017 +0200 Committer: Nick Pentreath Committed: Thu Mar 2 12:37:42 2017 +0200 -- .../apache/spark/ml/recommendation/ALS.scala| 31 +--- .../spark/ml/recommendation/ALSSuite.scala | 84 +--- 2 files changed, 95 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/625cfe09/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 04273a4..799e881 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -80,14 +80,24 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo /** * Attempts to safely cast a user/item id to an Int. Throws an exception if the value is - * out of integer range. + * out of integer range or contains a fractional part. */ - protected val checkedCast = udf { (n: Double) => -if (n > Int.MaxValue || n < Int.MinValue) { - throw new IllegalArgumentException(s"ALS only supports values in Integer range for columns " + -s"${$(userCol)} and ${$(itemCol)}. Value $n was out of Integer range.") -} else { - n.toInt + protected[recommendation] val checkedCast = udf { (n: Any) => +n match { + case v: Int => v // Avoid unnecessary casting + case v: Number => +val intV = v.intValue +// Checks if number within Int range and has no fractional part. +if (v.doubleValue == intV) { + intV +} else { + throw new IllegalArgumentException(s"ALS only supports values in Integer range " + +s"and without fractional part for columns ${$(userCol)} and ${$(itemCol)}. " + +s"Value $n was either out of Integer range or contained a fractional part that " + +s"could not be converted.") +} + case _ => throw new IllegalArgumentException(s"ALS only supports values in Integer range " + +s"for columns ${$(userCol)} and ${$(itemCol)}. Value $n was not numeric.") } } @@ -288,9 +298,9 @@ class ALSModel private[ml] ( } val predictions = dataset .join(userFactors, -checkedCast(dataset($(userCol)).cast(DoubleType)) === userFactors("id"), "left") +checkedCast(dataset($(userCol))) === userFactors("id"), "left") .join(itemFactors, -checkedCast(dataset($(itemCol)).cast(DoubleType)) === itemFactors("id"), "left") +checkedCast(dataset($(itemCol))) === itemFactors("id"), "left") .select(dataset("*"), predict(userFactors("features"), itemFactors("features")).as($(predictionCol))) getColdStartStrategy match { @@ -491,8 +501,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f) val ratings = dataset - .select(checkedCast(col($(userCol)).cast(DoubleType)), -checkedCast(col($(itemCol)).cast(DoubleType)), r) + .select(checkedCast(col($(userCol))), checkedCast(col($(itemCol))), r) .rdd .map { row => Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) http://git-wip-us.apache.org/repos/asf/spark/blob/625cfe09/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala -- diff --git
spark git commit: [SPARK-18352][DOCS] wholeFile JSON update doc and programming guide
Repository: spark Updated Branches: refs/heads/master d2a879762 -> 8d6ef895e [SPARK-18352][DOCS] wholeFile JSON update doc and programming guide ## What changes were proposed in this pull request? Update doc for R, programming guide. Clarify default behavior for all languages. ## How was this patch tested? manually Author: Felix CheungCloses #17128 from felixcheung/jsonwholefiledoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d6ef895 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d6ef895 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d6ef895 Branch: refs/heads/master Commit: 8d6ef895ee492b8febbaac7ab2ef2c907b48fa4a Parents: d2a8797 Author: Felix Cheung Authored: Thu Mar 2 01:02:38 2017 -0800 Committer: Felix Cheung Committed: Thu Mar 2 01:02:38 2017 -0800 -- R/pkg/R/SQLContext.R| 10 +--- docs/sql-programming-guide.md | 26 +++- python/pyspark/sql/readwriter.py| 4 +-- python/pyspark/sql/streaming.py | 4 +-- .../org/apache/spark/sql/DataFrameReader.scala | 4 +-- .../spark/sql/streaming/DataStreamReader.scala | 4 +-- 6 files changed, 30 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8d6ef895/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index e771a05..8354f70 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -332,8 +332,10 @@ setMethod("toDF", signature(x = "RDD"), #' Create a SparkDataFrame from a JSON file. #' -#' Loads a JSON file (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON} -#' ), returning the result as a SparkDataFrame +#' Loads a JSON file, returning the result as a SparkDataFrame +#' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON} +#' ) is supported. For JSON (one record per file), set a named property \code{wholeFile} to +#' \code{TRUE}. #' It goes through the entire dataset once to determine the schema. #' #' @param path Path of file to read. A vector of multiple paths is allowed. @@ -346,6 +348,7 @@ setMethod("toDF", signature(x = "RDD"), #' sparkR.session() #' path <- "path/to/file.json" #' df <- read.json(path) +#' df <- read.json(path, wholeFile = TRUE) #' df <- jsonFile(path) #' } #' @name read.json @@ -778,6 +781,7 @@ dropTempView <- function(viewName) { #' @return SparkDataFrame #' @rdname read.df #' @name read.df +#' @seealso \link{read.json} #' @export #' @examples #'\dontrun{ @@ -785,7 +789,7 @@ dropTempView <- function(viewName) { #' df1 <- read.df("path/to/file.json", source = "json") #' schema <- structType(structField("name", "string"), #' structField("info", "map ")) -#' df2 <- read.df(mapTypeJsonPath, "json", schema) +#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE) #' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true") #' } #' @name read.df http://git-wip-us.apache.org/repos/asf/spark/blob/8d6ef895/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 2dd1ab6..b077575 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -386,8 +386,8 @@ For example: The [built-in DataFrames functions](api/scala/index.html#org.apache.spark.sql.functions$) provide common aggregations such as `count()`, `countDistinct()`, `avg()`, `max()`, `min()`, etc. -While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in -[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$) and +While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in +[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$) and [Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own. @@ -397,7 +397,7 @@ Moreover, users are not limited to the predefined aggregate functions and can cr -Users have to extend the [UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction) +Users have to extend the [UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction) abstract class to implement a custom untyped