spark git commit: [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code
Repository: spark Updated Branches: refs/heads/branch-2.0 91dffcabd -> f0fa0a894 [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code ## What changes were proposed in this pull request? In case of any bugs in whole-stage codegen, the generated code can't be compiled, we should fallback to non-codegen to make sure that query could run. The batch mode of new parquet reader depends on codegen, can't be easily switched to non-batch mode, so we still use codegen for batched scan (for parquet). Because it only support primitive types and the number of columns is less than spark.sql.codegen.maxFields (100), it should not fail. This could be configurable by `spark.sql.codegen.fallback` ## How was this patch tested? Manual test it with buggy operator, it worked well. Author: Davies Liu Closes #13501 from davies/codegen_fallback. (cherry picked from commit 7504bc73f20fe0e6546a019ed91c3fd3804287ba) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0fa0a89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0fa0a89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0fa0a89 Branch: refs/heads/branch-2.0 Commit: f0fa0a8946fb4bdf0f4697a8e389f49e98422871 Parents: 91dffca Author: Davies Liu Authored: Fri Jun 10 21:12:06 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 21:12:15 2016 -0700 -- .../org/apache/spark/sql/execution/ExistingRDD.scala | 5 - .../spark/sql/execution/WholeStageCodegenExec.scala | 11 ++- .../scala/org/apache/spark/sql/internal/SQLConf.scala| 11 ++- 3 files changed, 24 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9ab98fd1..ee72a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec( "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { -throw new UnsupportedOperationException +// in the case of fallback, this batched scan should never fail because of: +// 1) only primitive types are supported +// 2) the number of columns should be smaller than spark.sql.codegen.maxFields +WholeStageCodegenExec(this).execute() } override def simpleString: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e0d8e35..ac4c3aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * An interface for those physical operators that support codegen. @@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments())) logDebug(s"\n${CodeFormatter.format(cleanedSource)}") -CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() +// try to compile and fallback if it failed +try { + CodeGenerator.compile(cleanedSource) +} catch { + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => +// We should already saw the error message +logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") +return child.execute() +} val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -
spark git commit: [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code
Repository: spark Updated Branches: refs/heads/master 468da03e2 -> 7504bc73f [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code ## What changes were proposed in this pull request? In case of any bugs in whole-stage codegen, the generated code can't be compiled, we should fallback to non-codegen to make sure that query could run. The batch mode of new parquet reader depends on codegen, can't be easily switched to non-batch mode, so we still use codegen for batched scan (for parquet). Because it only support primitive types and the number of columns is less than spark.sql.codegen.maxFields (100), it should not fail. This could be configurable by `spark.sql.codegen.fallback` ## How was this patch tested? Manual test it with buggy operator, it worked well. Author: Davies Liu Closes #13501 from davies/codegen_fallback. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7504bc73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7504bc73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7504bc73 Branch: refs/heads/master Commit: 7504bc73f20fe0e6546a019ed91c3fd3804287ba Parents: 468da03 Author: Davies Liu Authored: Fri Jun 10 21:12:06 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 21:12:06 2016 -0700 -- .../org/apache/spark/sql/execution/ExistingRDD.scala | 5 - .../spark/sql/execution/WholeStageCodegenExec.scala | 11 ++- .../scala/org/apache/spark/sql/internal/SQLConf.scala| 11 ++- 3 files changed, 24 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9ab98fd1..ee72a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec( "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { -throw new UnsupportedOperationException +// in the case of fallback, this batched scan should never fail because of: +// 1) only primitive types are supported +// 2) the number of columns should be smaller than spark.sql.codegen.maxFields +WholeStageCodegenExec(this).execute() } override def simpleString: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e0d8e35..ac4c3aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * An interface for those physical operators that support codegen. @@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments())) logDebug(s"\n${CodeFormatter.format(cleanedSource)}") -CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() +// try to compile and fallback if it failed +try { + CodeGenerator.compile(cleanedSource) +} catch { + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => +// We should already saw the error message +logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") +return child.execute() +} val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apach
spark git commit: Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader"
Repository: spark Updated Branches: refs/heads/branch-2.0 a08715c7a -> 91dffcabd Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader" This reverts commit 7d6bd1196410563bd1fccc10e7bff6e75b5c9f22. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91dffcab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91dffcab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91dffcab Branch: refs/heads/branch-2.0 Commit: 91dffcabdecd4ab651024c027cf9716664084e1e Parents: a08715c Author: Cheng Lian Authored: Fri Jun 10 20:45:27 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 20:45:27 2016 -0700 -- .../catalyst/expressions/namedExpressions.scala | 8 --- .../datasources/FileSourceStrategy.scala| 9 +-- .../datasources/parquet/ParquetFileFormat.scala | 61 ++-- 3 files changed, 57 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/91dffcab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c06a1ea..306a99d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,14 +292,6 @@ case class AttributeReference( } } - def withMetadata(newMetadata: Metadata): AttributeReference = { -if (metadata == newMetadata) { - this -} else { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) -} - } - override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/91dffcab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7fc842f..13a86bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = -l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => - files.dataSchema.find(_.name == c.name).map { f => -c match { - case a: AttributeReference => a.withMetadata(f.metadata) - case _ => c -} - }.getOrElse(c) -} +l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/91dffcab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index bc4a9de..3735c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - pushed.foreach { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) - } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -597,6 +592,62 @@ private[sql
spark git commit: [SPARK-15678] Add support to REFRESH data source paths
Repository: spark Updated Branches: refs/heads/master 8e7b56f3d -> 468da03e2 [SPARK-15678] Add support to REFRESH data source paths ## What changes were proposed in this pull request? Spark currently incorrectly continues to use cached data even if the underlying data is overwritten. Current behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) sqlContext.read.parquet(dir).count() // outputs 1000 < We are still using the cached dataset ``` This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path. Expected behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) spark.catalog.refreshResource(dir) sqlContext.read.parquet(dir).count() // outputs 10 < We are not using the cached dataset ``` ## How was this patch tested? Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`. Author: Sameer Agarwal Closes #13566 from sameeragarwal/refresh-path-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/468da03e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/468da03e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/468da03e Branch: refs/heads/master Commit: 468da03e23a01e02718608f05d778386cbb8416b Parents: 8e7b56f Author: Sameer Agarwal Authored: Fri Jun 10 20:43:18 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 20:43:18 2016 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../org/apache/spark/sql/catalog/Catalog.scala | 7 +++ .../spark/sql/execution/CacheManager.scala | 51 +++- .../spark/sql/execution/SparkSqlParser.scala| 9 +++- .../spark/sql/execution/datasources/ddl.scala | 9 .../apache/spark/sql/internal/CatalogImpl.scala | 10 .../datasources/parquet/ParquetQuerySuite.scala | 28 +++ .../spark/sql/hive/CachedTableSuite.scala | 45 + 8 files changed, 158 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d102559..044f910 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -113,6 +113,7 @@ statement | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? #describeTable | REFRESH TABLE tableIdentifier #refreshTable +| REFRESH .*? #refreshResource | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6ddb1a7..083a63c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -226,4 +226,11 @@ abstract class Catalog { */ def refreshTable(tableName: String): Unit + /** + * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that + * contains the given data source path. + * + * @since 2.0.0 + */ + def refreshByPath(path: String): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scal
spark git commit: [SPARK-15678] Add support to REFRESH data source paths
Repository: spark Updated Branches: refs/heads/branch-2.0 798825c09 -> a08715c7a [SPARK-15678] Add support to REFRESH data source paths ## What changes were proposed in this pull request? Spark currently incorrectly continues to use cached data even if the underlying data is overwritten. Current behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) sqlContext.read.parquet(dir).count() // outputs 1000 < We are still using the cached dataset ``` This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path. Expected behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) spark.catalog.refreshResource(dir) sqlContext.read.parquet(dir).count() // outputs 10 < We are not using the cached dataset ``` ## How was this patch tested? Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`. Author: Sameer Agarwal Closes #13566 from sameeragarwal/refresh-path-2. (cherry picked from commit 468da03e23a01e02718608f05d778386cbb8416b) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a08715c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a08715c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a08715c7 Branch: refs/heads/branch-2.0 Commit: a08715c7a79ce1953b8d64a9cf0ec1c513d56eec Parents: 798825c Author: Sameer Agarwal Authored: Fri Jun 10 20:43:18 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 20:43:26 2016 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../org/apache/spark/sql/catalog/Catalog.scala | 7 +++ .../spark/sql/execution/CacheManager.scala | 51 +++- .../spark/sql/execution/SparkSqlParser.scala| 9 +++- .../spark/sql/execution/datasources/ddl.scala | 9 .../apache/spark/sql/internal/CatalogImpl.scala | 10 .../datasources/parquet/ParquetQuerySuite.scala | 28 +++ .../spark/sql/hive/CachedTableSuite.scala | 45 + 8 files changed, 158 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d102559..044f910 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -113,6 +113,7 @@ statement | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? #describeTable | REFRESH TABLE tableIdentifier #refreshTable +| REFRESH .*? #refreshResource | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6ddb1a7..083a63c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -226,4 +226,11 @@ abstract class Catalog { */ def refreshTable(tableName: String): Unit + /** + * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that + * contains the given data source path. + * + * @since 2.0.0 + */ + def refreshByPath(path: String): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql
spark git commit: Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader"
Repository: spark Updated Branches: refs/heads/master 99f3c8277 -> 8e7b56f3d Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader" This reverts commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e7b56f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e7b56f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e7b56f3 Branch: refs/heads/master Commit: 8e7b56f3d4917692d3ff44d91aa264738a6fc2ed Parents: 99f3c82 Author: Cheng Lian Authored: Fri Jun 10 20:41:48 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 20:41:48 2016 -0700 -- .../catalyst/expressions/namedExpressions.scala | 8 --- .../datasources/FileSourceStrategy.scala| 9 +-- .../datasources/parquet/ParquetFileFormat.scala | 61 ++-- 3 files changed, 57 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c06a1ea..306a99d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,14 +292,6 @@ case class AttributeReference( } } - def withMetadata(newMetadata: Metadata): AttributeReference = { -if (metadata == newMetadata) { - this -} else { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) -} - } - override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7fc842f..13a86bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = -l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => - files.dataSchema.find(_.name == c.name).map { f => -c match { - case a: AttributeReference => a.withMetadata(f.metadata) - case _ => c -} - }.getOrElse(c) -} +l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index bc4a9de..3735c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - pushed.foreach { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) - } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -597,6 +592,62 @@ private[sql] object
spark git commit: [SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms
Repository: spark Updated Branches: refs/heads/branch-2.0 7d6bd1196 -> 798825c09 [SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms ## What changes were proposed in this pull request? This PR fixes Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms. I firstly executed this shell command, `grep -r "from pyspark.mllib" .` and then executed them all. Some of tests in `ml` produced the error messages as below: ``` pyspark.sql.utils.IllegalArgumentException: u'requirement failed: Input type must be VectorUDT but got org.apache.spark.mllib.linalg.VectorUDTf71b0bce.' ``` So, I fixed them to use new ones just identically with some Python tests fixed in https://github.com/apache/spark/pull/12627 ## How was this patch tested? Manually tested for all the examples listed by `grep -r "from pyspark.mllib" .`. Author: hyukjinkwon Closes #13393 from HyukjinKwon/SPARK-14615. (cherry picked from commit 99f3c82776fe5ea4f89a9965a288c7447585dc2c) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/798825c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/798825c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/798825c0 Branch: refs/heads/branch-2.0 Commit: 798825c09ba55dca449bde3f00ff2aeafd6b05b7 Parents: 7d6bd11 Author: hyukjinkwon Authored: Fri Jun 10 18:29:26 2016 -0700 Committer: Joseph K. Bradley Committed: Fri Jun 10 18:29:37 2016 -0700 -- .../main/python/ml/aft_survival_regression.py| 2 +- .../src/main/python/ml/chisq_selector_example.py | 2 +- examples/src/main/python/ml/dct_example.py | 2 +- .../python/ml/elementwise_product_example.py | 2 +- .../ml/estimator_transformer_param_example.py| 2 +- examples/src/main/python/ml/pca_example.py | 2 +- .../python/ml/polynomial_expansion_example.py| 2 +- .../src/main/python/ml/simple_params_example.py | 19 +-- .../main/python/ml/vector_assembler_example.py | 2 +- .../src/main/python/ml/vector_slicer_example.py | 2 +- 10 files changed, 18 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/aft_survival_regression.py -- diff --git a/examples/src/main/python/ml/aft_survival_regression.py b/examples/src/main/python/ml/aft_survival_regression.py index 9879679..060f017 100644 --- a/examples/src/main/python/ml/aft_survival_regression.py +++ b/examples/src/main/python/ml/aft_survival_regression.py @@ -19,7 +19,7 @@ from __future__ import print_function # $example on$ from pyspark.ml.regression import AFTSurvivalRegression -from pyspark.mllib.linalg import Vectors +from pyspark.ml.linalg import Vectors # $example off$ from pyspark.sql import SparkSession http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/chisq_selector_example.py -- diff --git a/examples/src/main/python/ml/chisq_selector_example.py b/examples/src/main/python/ml/chisq_selector_example.py index 8bafb94..5e19ef1 100644 --- a/examples/src/main/python/ml/chisq_selector_example.py +++ b/examples/src/main/python/ml/chisq_selector_example.py @@ -20,7 +20,7 @@ from __future__ import print_function from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import ChiSqSelector -from pyspark.mllib.linalg import Vectors +from pyspark.ml.linalg import Vectors # $example off$ if __name__ == "__main__": http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/dct_example.py -- diff --git a/examples/src/main/python/ml/dct_example.py b/examples/src/main/python/ml/dct_example.py index e36fcde..a4f25df 100644 --- a/examples/src/main/python/ml/dct_example.py +++ b/examples/src/main/python/ml/dct_example.py @@ -19,7 +19,7 @@ from __future__ import print_function # $example on$ from pyspark.ml.feature import DCT -from pyspark.mllib.linalg import Vectors +from pyspark.ml.linalg import Vectors # $example off$ from pyspark.sql import SparkSession http://git-wip-us.apache.org/repos/asf/spark/blob/798825c0/examples/src/main/python/ml/elementwise_product_example.py -- diff --git a/examples/src/main/python/ml/elementwise_product_example.py b/examples/src/main/python/ml/elementwise_product_example.py index 41727ed..598deae 100644 --- a/examples/src/main/python/ml/elementwise_product_example.py +++ b/examples/src/main/python/ml/elementwise_product_exa
spark git commit: [SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms
Repository: spark Updated Branches: refs/heads/master bba5d7999 -> 99f3c8277 [SPARK-14615][ML][FOLLOWUP] Fix Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms ## What changes were proposed in this pull request? This PR fixes Python examples to use the new ML Vector and Matrix APIs in the ML pipeline based algorithms. I firstly executed this shell command, `grep -r "from pyspark.mllib" .` and then executed them all. Some of tests in `ml` produced the error messages as below: ``` pyspark.sql.utils.IllegalArgumentException: u'requirement failed: Input type must be VectorUDT but got org.apache.spark.mllib.linalg.VectorUDTf71b0bce.' ``` So, I fixed them to use new ones just identically with some Python tests fixed in https://github.com/apache/spark/pull/12627 ## How was this patch tested? Manually tested for all the examples listed by `grep -r "from pyspark.mllib" .`. Author: hyukjinkwon Closes #13393 from HyukjinKwon/SPARK-14615. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99f3c827 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99f3c827 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99f3c827 Branch: refs/heads/master Commit: 99f3c82776fe5ea4f89a9965a288c7447585dc2c Parents: bba5d79 Author: hyukjinkwon Authored: Fri Jun 10 18:29:26 2016 -0700 Committer: Joseph K. Bradley Committed: Fri Jun 10 18:29:26 2016 -0700 -- .../main/python/ml/aft_survival_regression.py| 2 +- .../src/main/python/ml/chisq_selector_example.py | 2 +- examples/src/main/python/ml/dct_example.py | 2 +- .../python/ml/elementwise_product_example.py | 2 +- .../ml/estimator_transformer_param_example.py| 2 +- examples/src/main/python/ml/pca_example.py | 2 +- .../python/ml/polynomial_expansion_example.py| 2 +- .../src/main/python/ml/simple_params_example.py | 19 +-- .../main/python/ml/vector_assembler_example.py | 2 +- .../src/main/python/ml/vector_slicer_example.py | 2 +- 10 files changed, 18 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/aft_survival_regression.py -- diff --git a/examples/src/main/python/ml/aft_survival_regression.py b/examples/src/main/python/ml/aft_survival_regression.py index 9879679..060f017 100644 --- a/examples/src/main/python/ml/aft_survival_regression.py +++ b/examples/src/main/python/ml/aft_survival_regression.py @@ -19,7 +19,7 @@ from __future__ import print_function # $example on$ from pyspark.ml.regression import AFTSurvivalRegression -from pyspark.mllib.linalg import Vectors +from pyspark.ml.linalg import Vectors # $example off$ from pyspark.sql import SparkSession http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/chisq_selector_example.py -- diff --git a/examples/src/main/python/ml/chisq_selector_example.py b/examples/src/main/python/ml/chisq_selector_example.py index 8bafb94..5e19ef1 100644 --- a/examples/src/main/python/ml/chisq_selector_example.py +++ b/examples/src/main/python/ml/chisq_selector_example.py @@ -20,7 +20,7 @@ from __future__ import print_function from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import ChiSqSelector -from pyspark.mllib.linalg import Vectors +from pyspark.ml.linalg import Vectors # $example off$ if __name__ == "__main__": http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/dct_example.py -- diff --git a/examples/src/main/python/ml/dct_example.py b/examples/src/main/python/ml/dct_example.py index e36fcde..a4f25df 100644 --- a/examples/src/main/python/ml/dct_example.py +++ b/examples/src/main/python/ml/dct_example.py @@ -19,7 +19,7 @@ from __future__ import print_function # $example on$ from pyspark.ml.feature import DCT -from pyspark.mllib.linalg import Vectors +from pyspark.ml.linalg import Vectors # $example off$ from pyspark.sql import SparkSession http://git-wip-us.apache.org/repos/asf/spark/blob/99f3c827/examples/src/main/python/ml/elementwise_product_example.py -- diff --git a/examples/src/main/python/ml/elementwise_product_example.py b/examples/src/main/python/ml/elementwise_product_example.py index 41727ed..598deae 100644 --- a/examples/src/main/python/ml/elementwise_product_example.py +++ b/examples/src/main/python/ml/elementwise_product_example.py @@ -19,7 +19,7 @@ from __future__ import print_function # $example on$ from pyspark.ml.feature impor
spark git commit: [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader
Repository: spark Updated Branches: refs/heads/branch-2.0 0a450cfff -> 7d6bd1196 [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader ## What changes were proposed in this pull request? The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader. ## How was this patch tested? Existing tests should be passed. Author: Liang-Chi Hsieh Closes #13371 from viirya/vectorized-reader-push-down-filter. (cherry picked from commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d6bd119 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d6bd119 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d6bd119 Branch: refs/heads/branch-2.0 Commit: 7d6bd1196410563bd1fccc10e7bff6e75b5c9f22 Parents: 0a450cf Author: Liang-Chi Hsieh Authored: Fri Jun 10 18:23:59 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 18:24:06 2016 -0700 -- .../catalyst/expressions/namedExpressions.scala | 8 +++ .../datasources/FileSourceStrategy.scala| 9 ++- .../datasources/parquet/ParquetFileFormat.scala | 61 ++-- 3 files changed, 21 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d6bd119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 306a99d..c06a1ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,6 +292,14 @@ case class AttributeReference( } } + def withMetadata(newMetadata: Metadata): AttributeReference = { +if (metadata == newMetadata) { + this +} else { + AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) +} + } + override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/7d6bd119/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 13a86bf..7fc842f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = -l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) +l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => + files.dataSchema.find(_.name == c.name).map { f => +c match { + case a: AttributeReference => a.withMetadata(f.metadata) + case _ => c +} + }.getOrElse(c) +} // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/7d6bd119/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 3735c94..bc4a9de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spar
spark git commit: [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader
Repository: spark Updated Branches: refs/heads/master 54f758b5f -> bba5d7999 [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader ## What changes were proposed in this pull request? The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader. ## How was this patch tested? Existing tests should be passed. Author: Liang-Chi Hsieh Closes #13371 from viirya/vectorized-reader-push-down-filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bba5d799 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bba5d799 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bba5d799 Branch: refs/heads/master Commit: bba5d7999f7b3ae9d816ea552ba9378fea1615a6 Parents: 54f758b Author: Liang-Chi Hsieh Authored: Fri Jun 10 18:23:59 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 18:23:59 2016 -0700 -- .../catalyst/expressions/namedExpressions.scala | 8 +++ .../datasources/FileSourceStrategy.scala| 9 ++- .../datasources/parquet/ParquetFileFormat.scala | 61 ++-- 3 files changed, 21 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 306a99d..c06a1ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,6 +292,14 @@ case class AttributeReference( } } + def withMetadata(newMetadata: Metadata): AttributeReference = { +if (metadata == newMetadata) { + this +} else { + AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) +} + } + override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 13a86bf..7fc842f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = -l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) +l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => + files.dataSchema.find(_.name == c.name).map { f => +c match { + case a: AttributeReference => a.withMetadata(f.metadata) + case _ => c +} + }.getOrElse(c) +} // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 3735c94..bc4a9de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -357,6 +357,11 @@ private[sql] class Parqu
spark git commit: [SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR
Repository: spark Updated Branches: refs/heads/branch-2.0 f41f433b1 -> 0a450cfff [SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR ## What changes were proposed in this pull request? As discussed in https://github.com/apache/spark/pull/12836 we need to override stringArgs method in MapPartitionsInR in order to avoid too large strings generated by "stringArgs" method based on the input arguments. In this case exclude some of the input arguments: serialized R objects. ## How was this patch tested? Existing test cases Author: Narine Kokhlikyan Closes #13610 from NarineK/dapply_MapPartitionsInR_stringArgs. (cherry picked from commit 54f758b5fc60ecb0da6b191939a72ef5829be38c) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a450cff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a450cff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a450cff Branch: refs/heads/branch-2.0 Commit: 0a450cfffada67f841795a09af3bf6320343b358 Parents: f41f433 Author: Narine Kokhlikyan Authored: Fri Jun 10 17:17:47 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 17:17:57 2016 -0700 -- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a450cff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 55d8adf..78e8822 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -141,6 +141,9 @@ case class MapPartitionsInR( outputObjAttr: Attribute, child: LogicalPlan) extends ObjectConsumer with ObjectProducer { override lazy val schema = outputSchema + + override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, outputSchema, +outputObjAttr, child) } object MapElements { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR
Repository: spark Updated Branches: refs/heads/master 2022afe57 -> 54f758b5f [SPARK-15884][SPARKR][SQL] Overriding stringArgs in MapPartitionsInR ## What changes were proposed in this pull request? As discussed in https://github.com/apache/spark/pull/12836 we need to override stringArgs method in MapPartitionsInR in order to avoid too large strings generated by "stringArgs" method based on the input arguments. In this case exclude some of the input arguments: serialized R objects. ## How was this patch tested? Existing test cases Author: Narine Kokhlikyan Closes #13610 from NarineK/dapply_MapPartitionsInR_stringArgs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54f758b5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54f758b5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54f758b5 Branch: refs/heads/master Commit: 54f758b5fc60ecb0da6b191939a72ef5829be38c Parents: 2022afe Author: Narine Kokhlikyan Authored: Fri Jun 10 17:17:47 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 17:17:47 2016 -0700 -- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54f758b5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 55d8adf..78e8822 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -141,6 +141,9 @@ case class MapPartitionsInR( outputObjAttr: Attribute, child: LogicalPlan) extends ObjectConsumer with ObjectProducer { override lazy val schema = outputSchema + + override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, outputSchema, +outputObjAttr, child) } object MapElements { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible
Repository: spark Updated Branches: refs/heads/branch-2.0 e6ebb547b -> f41f433b1 [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible ## What changes were proposed in this pull request? Instead of using local variable `sc` like the following example, this PR uses `spark.sparkContext`. This makes examples more concise, and also fixes some misleading, i.e., creating SparkContext from SparkSession. ``` -println("Creating SparkContext") -val sc = spark.sparkContext - println("Writing local file to DFS") val dfsFilename = dfsDirPath + "/dfs_read_write_test" -val fileRDD = sc.parallelize(fileContents) +val fileRDD = spark.sparkContext.parallelize(fileContents) ``` This will change 12 files (+30 lines, -52 lines). ## How was this patch tested? Manual. Author: Dongjoon Hyun Closes #13520 from dongjoon-hyun/SPARK-15773. (cherry picked from commit 2022afe57dbf8cb0c9909399962c4a3649e0601c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f41f433b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f41f433b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f41f433b Branch: refs/heads/branch-2.0 Commit: f41f433b101d5eac5bdd3a033e15f69e2215d30d Parents: e6ebb54 Author: Dongjoon Hyun Authored: Fri Jun 10 15:40:29 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 15:40:35 2016 -0700 -- examples/src/main/python/pi.py | 4 +--- examples/src/main/python/transitive_closure.py | 4 +--- .../apache/spark/examples/DFSReadWriteTest.scala| 7 ++- .../spark/examples/ExceptionHandlingTest.scala | 3 +-- .../org/apache/spark/examples/GroupByTest.scala | 14 ++ .../apache/spark/examples/MultiBroadcastTest.scala | 8 +++- .../spark/examples/SimpleSkewedGroupByTest.scala| 16 +++- .../apache/spark/examples/SkewedGroupByTest.scala | 13 + .../scala/org/apache/spark/examples/SparkLR.scala | 4 +--- .../scala/org/apache/spark/examples/SparkPi.scala | 3 +-- .../scala/org/apache/spark/examples/SparkTC.scala | 3 +-- .../spark/examples/sql/hive/HiveFromSpark.scala | 3 +-- 12 files changed, 30 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f41f433b/examples/src/main/python/pi.py -- diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index b39d710..e3f0c4a 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -32,8 +32,6 @@ if __name__ == "__main__": .appName("PythonPi")\ .getOrCreate() -sc = spark.sparkContext - partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 10 * partitions @@ -42,7 +40,7 @@ if __name__ == "__main__": y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 -count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add) +count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/f41f433b/examples/src/main/python/transitive_closure.py -- diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index d88ea94..49551d4 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -46,10 +46,8 @@ if __name__ == "__main__": .appName("PythonTransitiveClosure")\ .getOrCreate() -sc = spark.sparkContext - partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 -tc = sc.parallelize(generateGraph(), partitions).cache() +tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache() # Linear transitive closure: each round grows paths by one edge, # by joining the graph's edges with the already-discovered paths. http://git-wip-us.apache.org/repos/asf/spark/blob/f41f433b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 4b5e36c..3bff7ce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -107,16 +107,13 @@ object DFSReadWriteTest { .appName("DFS Read Write Test") .getOrCreate() -pri
spark git commit: [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible
Repository: spark Updated Branches: refs/heads/master 127a6678d -> 2022afe57 [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible ## What changes were proposed in this pull request? Instead of using local variable `sc` like the following example, this PR uses `spark.sparkContext`. This makes examples more concise, and also fixes some misleading, i.e., creating SparkContext from SparkSession. ``` -println("Creating SparkContext") -val sc = spark.sparkContext - println("Writing local file to DFS") val dfsFilename = dfsDirPath + "/dfs_read_write_test" -val fileRDD = sc.parallelize(fileContents) +val fileRDD = spark.sparkContext.parallelize(fileContents) ``` This will change 12 files (+30 lines, -52 lines). ## How was this patch tested? Manual. Author: Dongjoon Hyun Closes #13520 from dongjoon-hyun/SPARK-15773. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2022afe5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2022afe5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2022afe5 Branch: refs/heads/master Commit: 2022afe57dbf8cb0c9909399962c4a3649e0601c Parents: 127a667 Author: Dongjoon Hyun Authored: Fri Jun 10 15:40:29 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 15:40:29 2016 -0700 -- examples/src/main/python/pi.py | 4 +--- examples/src/main/python/transitive_closure.py | 4 +--- .../apache/spark/examples/DFSReadWriteTest.scala| 7 ++- .../spark/examples/ExceptionHandlingTest.scala | 3 +-- .../org/apache/spark/examples/GroupByTest.scala | 14 ++ .../apache/spark/examples/MultiBroadcastTest.scala | 8 +++- .../spark/examples/SimpleSkewedGroupByTest.scala| 16 +++- .../apache/spark/examples/SkewedGroupByTest.scala | 13 + .../scala/org/apache/spark/examples/SparkLR.scala | 4 +--- .../scala/org/apache/spark/examples/SparkPi.scala | 3 +-- .../scala/org/apache/spark/examples/SparkTC.scala | 3 +-- .../spark/examples/sql/hive/HiveFromSpark.scala | 3 +-- 12 files changed, 30 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/python/pi.py -- diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index b39d710..e3f0c4a 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -32,8 +32,6 @@ if __name__ == "__main__": .appName("PythonPi")\ .getOrCreate() -sc = spark.sparkContext - partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 10 * partitions @@ -42,7 +40,7 @@ if __name__ == "__main__": y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 -count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add) +count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/python/transitive_closure.py -- diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index d88ea94..49551d4 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -46,10 +46,8 @@ if __name__ == "__main__": .appName("PythonTransitiveClosure")\ .getOrCreate() -sc = spark.sparkContext - partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 -tc = sc.parallelize(generateGraph(), partitions).cache() +tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache() # Linear transitive closure: each round grows paths by one edge, # by joining the graph's edges with the already-discovered paths. http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 4b5e36c..3bff7ce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -107,16 +107,13 @@ object DFSReadWriteTest { .appName("DFS Read Write Test") .getOrCreate() -println("Creating SparkContext") -val sc = spark.sparkContext - println("Writing local file to DFS")
spark git commit: [SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings
Repository: spark Updated Branches: refs/heads/master aec502d91 -> 127a6678d [SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings ## What changes were proposed in this pull request? Serializer instantiation will consider existing SparkConf ## How was this patch tested? manual test with `ImmutableList` (Guava) and `kryo-serializers`'s `Immutable*Serializer` implementations. Added Test Suite. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Sela Closes #13424 from amitsela/SPARK-15489. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/127a6678 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/127a6678 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/127a6678 Branch: refs/heads/master Commit: 127a6678d7af6b5164a115be7c64525bb80001fe Parents: aec502d Author: Sela Authored: Fri Jun 10 14:36:51 2016 -0700 Committer: Michael Armbrust Committed: Fri Jun 10 14:36:51 2016 -0700 -- .../catalyst/expressions/objects/objects.scala | 30 ++--- .../sql/DatasetSerializerRegistratorSuite.scala | 68 2 files changed, 89 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/127a6678/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 87c8a2e..c597a2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -22,7 +22,7 @@ import java.lang.reflect.Modifier import scala.language.existentials import scala.reflect.ClassTag -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.serializer._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -547,11 +547,17 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean) (classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName) } } +// try conf from env, otherwise create a new one +val env = s"${classOf[SparkEnv].getName}.get()" val sparkConf = s"new ${classOf[SparkConf].getName}()" -ctx.addMutableState( - serializerInstanceClass, - serializer, - s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();") +val serializerInit = s""" + if ($env == null) { +$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance(); + } else { + $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance(); + } + """ +ctx.addMutableState(serializerInstanceClass, serializer, serializerInit) // Code to serialize. val input = child.genCode(ctx) @@ -587,11 +593,17 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B (classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName) } } +// try conf from env, otherwise create a new one +val env = s"${classOf[SparkEnv].getName}.get()" val sparkConf = s"new ${classOf[SparkConf].getName}()" -ctx.addMutableState( - serializerInstanceClass, - serializer, - s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();") +val serializerInit = s""" + if ($env == null) { +$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance(); + } else { + $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance(); + } + """ +ctx.addMutableState(serializerInstanceClass, serializer, serializerInit) // Code to deserialize. val input = child.genCode(ctx) http://git-wip-us.apache.org/repos/asf/spark/blob/127a6678/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala new file mode 100644 index 000..0f3d0ce --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreem
spark git commit: [SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings
Repository: spark Updated Branches: refs/heads/branch-2.0 bc53422ad -> e6ebb547b [SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings ## What changes were proposed in this pull request? Serializer instantiation will consider existing SparkConf ## How was this patch tested? manual test with `ImmutableList` (Guava) and `kryo-serializers`'s `Immutable*Serializer` implementations. Added Test Suite. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Sela Closes #13424 from amitsela/SPARK-15489. (cherry picked from commit 127a6678d7af6b5164a115be7c64525bb80001fe) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6ebb547 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6ebb547 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6ebb547 Branch: refs/heads/branch-2.0 Commit: e6ebb547b197f906b9706847ad871b337b4a9e7f Parents: bc53422 Author: Sela Authored: Fri Jun 10 14:36:51 2016 -0700 Committer: Michael Armbrust Committed: Fri Jun 10 14:36:59 2016 -0700 -- .../catalyst/expressions/objects/objects.scala | 30 ++--- .../sql/DatasetSerializerRegistratorSuite.scala | 68 2 files changed, 89 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e6ebb547/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 87c8a2e..c597a2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -22,7 +22,7 @@ import java.lang.reflect.Modifier import scala.language.existentials import scala.reflect.ClassTag -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.serializer._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -547,11 +547,17 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean) (classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName) } } +// try conf from env, otherwise create a new one +val env = s"${classOf[SparkEnv].getName}.get()" val sparkConf = s"new ${classOf[SparkConf].getName}()" -ctx.addMutableState( - serializerInstanceClass, - serializer, - s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();") +val serializerInit = s""" + if ($env == null) { +$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance(); + } else { + $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance(); + } + """ +ctx.addMutableState(serializerInstanceClass, serializer, serializerInit) // Code to serialize. val input = child.genCode(ctx) @@ -587,11 +593,17 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B (classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName) } } +// try conf from env, otherwise create a new one +val env = s"${classOf[SparkEnv].getName}.get()" val sparkConf = s"new ${classOf[SparkConf].getName}()" -ctx.addMutableState( - serializerInstanceClass, - serializer, - s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();") +val serializerInit = s""" + if ($env == null) { +$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance(); + } else { + $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance(); + } + """ +ctx.addMutableState(serializerInstanceClass, serializer, serializerInit) // Code to deserialize. val input = child.genCode(ctx) http://git-wip-us.apache.org/repos/asf/spark/blob/e6ebb547/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala new file mode 100644 index 000..0f3d0ce --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala @@ -0,0 +1,
spark git commit: [SPARK-15654] [SQL] fix non-splitable files for text based file formats
Repository: spark Updated Branches: refs/heads/branch-2.0 f2e5d6d0f -> bc53422ad [SPARK-15654] [SQL] fix non-splitable files for text based file formats ## What changes were proposed in this pull request? Currently, we always split the files when it's bigger than maxSplitBytes, but Hadoop LineRecordReader does not respect the splits for compressed files correctly, we should have a API for FileFormat to check whether the file could be splitted or not. This PR is based on #13442, closes #13442 ## How was this patch tested? add regression tests. Author: Davies Liu Closes #13531 from davies/fix_split. (cherry picked from commit aec502d9114ad8e18bfbbd63f38780e076d326d1) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc53422a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc53422a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc53422a Branch: refs/heads/branch-2.0 Commit: bc53422ad54460069f0e36061c6be5ef76b4dbaa Parents: f2e5d6d Author: Davies Liu Authored: Fri Jun 10 14:32:43 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 14:32:53 2016 -0700 -- .../spark/ml/source/libsvm/LibSVMRelation.scala | 2 +- .../datasources/FileSourceStrategy.scala| 17 ++--- .../datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/fileSourceInterfaces.scala | 33 +++-- .../datasources/json/JsonFileFormat.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 7 .../datasources/text/TextFileFormat.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 37 +++- .../execution/datasources/text/TextSuite.scala | 17 + .../spark/sql/hive/orc/OrcFileFormat.scala | 7 .../spark/sql/sources/SimpleTextRelation.scala | 2 +- 11 files changed, 115 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc53422a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 7629369..b5b2a68 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -112,7 +112,7 @@ private[libsvm] class LibSVMOutputWriter( */ // If this is moved or renamed, please update DataSource's backwardCompatibilityMap. @Since("1.6.0") -class LibSVMFileFormat extends FileFormat with DataSourceRegister { +class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { @Since("1.6.0") override def shortName(): String = "libsvm" http://git-wip-us.apache.org/repos/asf/spark/blob/bc53422a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7503285..13a86bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -151,11 +151,18 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) - (0L until file.getLen by maxSplitBytes).map { offset => -val remaining = file.getLen - offset -val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining -val hosts = getBlockHosts(blockLocations, offset, size) -PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts) + if (files.fileFormat.isSplitable(files.sparkSession, files.options, file.getPath)) { +(0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( +partition.values, file.getPath.toUri.toString, offset, size, hosts) +} + } else { +val hosts = getBlockHosts(blockLocations, 0, file.getLen) +
spark git commit: [SPARK-15654] [SQL] fix non-splitable files for text based file formats
Repository: spark Updated Branches: refs/heads/master e05a2feeb -> aec502d91 [SPARK-15654] [SQL] fix non-splitable files for text based file formats ## What changes were proposed in this pull request? Currently, we always split the files when it's bigger than maxSplitBytes, but Hadoop LineRecordReader does not respect the splits for compressed files correctly, we should have a API for FileFormat to check whether the file could be splitted or not. This PR is based on #13442, closes #13442 ## How was this patch tested? add regression tests. Author: Davies Liu Closes #13531 from davies/fix_split. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aec502d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aec502d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aec502d9 Branch: refs/heads/master Commit: aec502d9114ad8e18bfbbd63f38780e076d326d1 Parents: e05a2fe Author: Davies Liu Authored: Fri Jun 10 14:32:43 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 14:32:43 2016 -0700 -- .../spark/ml/source/libsvm/LibSVMRelation.scala | 2 +- .../datasources/FileSourceStrategy.scala| 17 ++--- .../datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/fileSourceInterfaces.scala | 33 +++-- .../datasources/json/JsonFileFormat.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 7 .../datasources/text/TextFileFormat.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 37 +++- .../execution/datasources/text/TextSuite.scala | 17 + .../spark/sql/hive/orc/OrcFileFormat.scala | 7 .../spark/sql/sources/SimpleTextRelation.scala | 2 +- 11 files changed, 115 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 7629369..b5b2a68 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -112,7 +112,7 @@ private[libsvm] class LibSVMOutputWriter( */ // If this is moved or renamed, please update DataSource's backwardCompatibilityMap. @Since("1.6.0") -class LibSVMFileFormat extends FileFormat with DataSourceRegister { +class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { @Since("1.6.0") override def shortName(): String = "libsvm" http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7503285..13a86bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -151,11 +151,18 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) - (0L until file.getLen by maxSplitBytes).map { offset => -val remaining = file.getLen - offset -val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining -val hosts = getBlockHosts(blockLocations, offset, size) -PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts) + if (files.fileFormat.isSplitable(files.sparkSession, files.options, file.getPath)) { +(0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( +partition.values, file.getPath.toUri.toString, offset, size, hosts) +} + } else { +val hosts = getBlockHosts(blockLocations, 0, file.getLen) +Seq(PartitionedFile( + partition.values, file.getPath.toUri.toString, 0, file.getL
spark git commit: [SPARK-15825] [SQL] Fix SMJ invalid results
Repository: spark Updated Branches: refs/heads/master 026eb9064 -> e05a2feeb [SPARK-15825] [SQL] Fix SMJ invalid results ## What changes were proposed in this pull request? Code generated `SortMergeJoin` failed with wrong results when using structs as keys. This could (eventually) be traced back to the use of a wrong row reference when comparing structs. ## How was this patch tested? TBD Author: Herman van Hovell Closes #13589 from hvanhovell/SPARK-15822. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e05a2fee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e05a2fee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e05a2fee Branch: refs/heads/master Commit: e05a2feebe928df691d5a8f42f22e088c6263dcf Parents: 026eb90 Author: Herman van Hovell Authored: Fri Jun 10 14:29:05 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 14:29:05 2016 -0700 -- .../catalyst/expressions/codegen/CodeGenerator.scala | 1 + .../spark/sql/execution/joins/InnerJoinSuite.scala | 15 +++ 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e05a2fee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 9657f26..ca20292 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -490,6 +490,7 @@ class CodegenContext { addNewFunction(compareFunc, funcCode) s"this.$compareFunc($c1, $c2)" case schema: StructType => + INPUT_ROW = "i" val comparisons = GenerateOrdering.genComparisons(this, schema) val compareFunc = freshName("compareStruct") val funcCode: String = http://git-wip-us.apache.org/repos/asf/spark/blob/e05a2fee/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 27f6abc..35dab63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -271,4 +271,19 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { ) ) } + + { +def df: DataFrame = spark.range(3).selectExpr("struct(id, id) as key", "id as value") +lazy val left = df.selectExpr("key", "concat('L', value) as value").alias("left") +lazy val right = df.selectExpr("key", "concat('R', value) as value").alias("right") +testInnerJoin( + "SPARK-15822 - test structs as keys", + left, + right, + () => (left.col("key") === right.col("key")).expr, + Seq( +(Row(0, 0), "L0", Row(0, 0), "R0"), +(Row(1, 1), "L1", Row(1, 1), "R1"), +(Row(2, 2), "L2", Row(2, 2), "R2"))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15825] [SQL] Fix SMJ invalid results
Repository: spark Updated Branches: refs/heads/branch-2.0 80b8711b3 -> f2e5d6d0f [SPARK-15825] [SQL] Fix SMJ invalid results ## What changes were proposed in this pull request? Code generated `SortMergeJoin` failed with wrong results when using structs as keys. This could (eventually) be traced back to the use of a wrong row reference when comparing structs. ## How was this patch tested? TBD Author: Herman van Hovell Closes #13589 from hvanhovell/SPARK-15822. (cherry picked from commit e05a2feebe928df691d5a8f42f22e088c6263dcf) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2e5d6d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2e5d6d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2e5d6d0 Branch: refs/heads/branch-2.0 Commit: f2e5d6d0f446d7f1d6d8c3208871074abd669482 Parents: 80b8711 Author: Herman van Hovell Authored: Fri Jun 10 14:29:05 2016 -0700 Committer: Davies Liu Committed: Fri Jun 10 14:31:40 2016 -0700 -- .../catalyst/expressions/codegen/CodeGenerator.scala | 1 + .../spark/sql/execution/joins/InnerJoinSuite.scala | 15 +++ 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2e5d6d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 9657f26..ca20292 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -490,6 +490,7 @@ class CodegenContext { addNewFunction(compareFunc, funcCode) s"this.$compareFunc($c1, $c2)" case schema: StructType => + INPUT_ROW = "i" val comparisons = GenerateOrdering.genComparisons(this, schema) val compareFunc = freshName("compareStruct") val funcCode: String = http://git-wip-us.apache.org/repos/asf/spark/blob/f2e5d6d0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 27f6abc..35dab63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -271,4 +271,19 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { ) ) } + + { +def df: DataFrame = spark.range(3).selectExpr("struct(id, id) as key", "id as value") +lazy val left = df.selectExpr("key", "concat('L', value) as value").alias("left") +lazy val right = df.selectExpr("key", "concat('R', value) as value").alias("right") +testInnerJoin( + "SPARK-15822 - test structs as keys", + left, + right, + () => (left.col("key") === right.col("key")).expr, + Seq( +(Row(0, 0), "L0", Row(0, 0), "R0"), +(Row(1, 1), "L1", Row(1, 1), "R1"), +(Row(2, 2), "L2", Row(2, 2), "R2"))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to Scala API
Repository: spark Updated Branches: refs/heads/branch-2.0 8b6742a37 -> 80b8711b3 [SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to Scala API ## What changes were proposed in this pull request? Adding __str__ to RFormula and model that will show the set formula param and resolved formula. This is currently present in the Scala API, found missing in PySpark during Spark 2.0 coverage review. ## How was this patch tested? run pyspark-ml tests locally Author: Bryan Cutler Closes #13481 from BryanCutler/pyspark-ml-rformula_str-SPARK-15738. (cherry picked from commit 7d7a0a5e0749909e97d90188707cc9220a1bb73a) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80b8711b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80b8711b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80b8711b Branch: refs/heads/branch-2.0 Commit: 80b8711b342c5a569fe89d7ffbdd552653b9b6ec Parents: 8b6742a Author: Bryan Cutler Authored: Fri Jun 10 11:27:30 2016 -0700 Committer: Yanbo Liang Committed: Fri Jun 10 14:01:55 2016 -0700 -- .../scala/org/apache/spark/ml/feature/RFormula.scala | 2 +- .../org/apache/spark/ml/feature/RFormulaParser.scala | 14 +- python/pyspark/ml/feature.py | 12 3 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80b8711b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 2916b6d..a7ca0fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -182,7 +182,7 @@ class RFormula(override val uid: String) override def copy(extra: ParamMap): RFormula = defaultCopy(extra) - override def toString: String = s"RFormula(${get(formula)}) (uid=$uid)" + override def toString: String = s"RFormula(${get(formula).getOrElse("")}) (uid=$uid)" } @Since("2.0.0") http://git-wip-us.apache.org/repos/asf/spark/blob/80b8711b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala index 19aecff..2dd565a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala @@ -126,7 +126,19 @@ private[ml] case class ParsedRFormula(label: ColumnRef, terms: Seq[Term]) { * @param hasIntercept whether the formula specifies fitting with an intercept. */ private[ml] case class ResolvedRFormula( - label: String, terms: Seq[Seq[String]], hasIntercept: Boolean) + label: String, terms: Seq[Seq[String]], hasIntercept: Boolean) { + + override def toString: String = { +val ts = terms.map { + case t if t.length > 1 => +s"${t.mkString("{", ",", "}")}" + case t => +t.mkString +} +val termStr = ts.mkString("[", ",", "]") +s"ResolvedRFormula(label=$label, terms=$termStr, hasIntercept=$hasIntercept)" + } +} /** * R formula terms. See the R formula docs here for more information: http://git-wip-us.apache.org/repos/asf/spark/blob/80b8711b/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index bfb2fb7..ca77ac3 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2528,6 +2528,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaM True >>> loadedRF.getLabelCol() == rf.getLabelCol() True +>>> str(loadedRF) +'RFormula(y ~ x + s) (uid=...)' >>> modelPath = temp_path + "/rFormulaModel" >>> model.save(modelPath) >>> loadedModel = RFormulaModel.load(modelPath) @@ -2542,6 +2544,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaM |0.0|0.0| a|[0.0,1.0]| 0.0| +---+---+---+-+-+ ... +>>> str(loadedModel) +'RFormulaModel(ResolvedRFormula(label=y, terms=[x,s], hasIntercept=true)) (uid=...)' .. versionadded:: 1.5.0 """ @@ -2586,6 +2590,10 @@ class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaM def _create_model(self, java_model): return RFormulaModel(java_model) +def __str__(self): +formulaStr = self.getFormula()
spark git commit: [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0
Repository: spark Updated Branches: refs/heads/branch-2.0 96bb1476c -> 8b6742a37 [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0 ## What changes were proposed in this pull request? In scala, immutable.List.length is an expensive operation so we should avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and Seq.nonEmpty instead. ## How was this patch tested? existing tests Author: wangyang Closes #13601 from yangw1234/isEmpty. (cherry picked from commit 026eb90644be7685971dacaabae67a293edd0133) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b6742a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b6742a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b6742a3 Branch: refs/heads/branch-2.0 Commit: 8b6742a37d35520eedaee5f3112529136b3a21e4 Parents: 96bb147 Author: wangyang Authored: Fri Jun 10 13:10:03 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 13:10:09 2016 -0700 -- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 2 +- .../scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala | 2 +- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 6 +++--- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 2 +- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 2 +- .../spark/sql/execution/aggregate/SortAggregateExec.scala | 4 ++-- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 2 +- .../apache/spark/streaming/dstream/TransformedDStream.scala| 2 +- 9 files changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b6742a3/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 3df87f6..6a5e6f7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -235,7 +235,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } private def cleanupIdleWorkers() { -while (idleWorkers.length > 0) { +while (idleWorkers.nonEmpty) { val worker = idleWorkers.dequeue() try { // the worker will exit after closing the socket http://git-wip-us.apache.org/repos/asf/spark/blob/8b6742a3/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index b6366f3..d744d67 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -60,7 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { - require(rdds.length > 0) + require(rdds.nonEmpty) require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) http://git-wip-us.apache.org/repos/asf/spark/blob/8b6742a3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index fc71f83..6ddc72a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -375,14 +375,14 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Return true if there exists an input stream that still has unvisited pairs. */ -override def hasNext: Boolean = mergeHeap.length > 0 +override def hasNext: Boolean = mergeHeap.nonEmpty /** * Select a key with the minimum hash, then combine all values with the same key from all * input streams. */ override def next(): (K, C) = { - if (mergeHeap.length == 0) { + if (mergeHeap.isEmpty) { throw new NoSuchElementException } // Select a key from the Stre
spark git commit: [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0
Repository: spark Updated Branches: refs/heads/master 865ec32dd -> 026eb9064 [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0 ## What changes were proposed in this pull request? In scala, immutable.List.length is an expensive operation so we should avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and Seq.nonEmpty instead. ## How was this patch tested? existing tests Author: wangyang Closes #13601 from yangw1234/isEmpty. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/026eb906 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/026eb906 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/026eb906 Branch: refs/heads/master Commit: 026eb90644be7685971dacaabae67a293edd0133 Parents: 865ec32 Author: wangyang Authored: Fri Jun 10 13:10:03 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 13:10:03 2016 -0700 -- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 2 +- .../scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala | 2 +- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 6 +++--- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 2 +- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 2 +- .../spark/sql/execution/aggregate/SortAggregateExec.scala | 4 ++-- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 2 +- .../apache/spark/streaming/dstream/TransformedDStream.scala| 2 +- 9 files changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/026eb906/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 3df87f6..6a5e6f7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -235,7 +235,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } private def cleanupIdleWorkers() { -while (idleWorkers.length > 0) { +while (idleWorkers.nonEmpty) { val worker = idleWorkers.dequeue() try { // the worker will exit after closing the socket http://git-wip-us.apache.org/repos/asf/spark/blob/026eb906/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index b6366f3..d744d67 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -60,7 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { - require(rdds.length > 0) + require(rdds.nonEmpty) require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) http://git-wip-us.apache.org/repos/asf/spark/blob/026eb906/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index fc71f83..6ddc72a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -375,14 +375,14 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Return true if there exists an input stream that still has unvisited pairs. */ -override def hasNext: Boolean = mergeHeap.length > 0 +override def hasNext: Boolean = mergeHeap.nonEmpty /** * Select a key with the minimum hash, then combine all values with the same key from all * input streams. */ override def next(): (K, C) = { - if (mergeHeap.length == 0) { + if (mergeHeap.isEmpty) { throw new NoSuchElementException } // Select a key from the StreamBuffer that holds the lowest key hash @@ -397,7 +397,7 @@ class ExternalAppendOnlyMap[K, V, C]( /
spark git commit: [SPARK-6320][SQL] Move planLater method into GenericStrategy.
Repository: spark Updated Branches: refs/heads/master fb219029d -> 667d4ea7b [SPARK-6320][SQL] Move planLater method into GenericStrategy. ## What changes were proposed in this pull request? This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy. ## How was this patch tested? Existing tests. Author: Takuya UESHIN Closes #13147 from ueshin/issues/SPARK-6320. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/667d4ea7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/667d4ea7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/667d4ea7 Branch: refs/heads/master Commit: 667d4ea7b35f285954ea7cb719b7c80581e31f4d Parents: fb21902 Author: Takuya UESHIN Authored: Fri Jun 10 13:06:18 2016 -0700 Committer: Michael Armbrust Committed: Fri Jun 10 13:06:18 2016 -0700 -- .../sql/catalyst/planning/QueryPlanner.scala| 58 ++ .../spark/sql/execution/QueryExecution.scala| 2 + .../spark/sql/execution/SparkPlanner.scala | 13 .../spark/sql/execution/SparkStrategies.scala | 23 +++ .../scala/org/apache/spark/sql/package.scala| 4 +- .../spark/sql/execution/SparkPlannerSuite.scala | 63 6 files changed, 151 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 8b1a34f..5f694f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() - def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... -val iter = strategies.view.flatMap(_(plan)).toIterator -assert(iter.hasNext, s"No plan for $plan") -iter + +// Collect physical plan candidates. +val candidates = strategies.iterator.flatMap(_(plan)) + +// The candidates may contain placeholders marked as [[planLater]], +// so try to replace them by their child plans. +val plans = candidates.flatMap { candidate => + val placeholders = collectPlaceholders(candidate) + + if (placeholders.isEmpty) { +// Take the candidate as is because it does not contain placeholders. +Iterator(candidate) + } else { +// Plan the logical plan marked as [[planLater]] and replace the placeholders. +placeholders.iterator.foldLeft(Iterator(candidate)) { + case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => +// Plan the logical plan for the placeholder. +val childPlans = this.plan(logicalPlan) + +candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => + childPlans.map { childPlan => +// Replace the placeholder by the child plan +candidateWithPlaceholders.transformUp { + case p if p == placeholder => childPlan +} + } +} +} + } +} + +val pruned = prunePlans(plans) +assert(pruned.hasNext, s"No plan for $plan") +pruned } + + /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */ + protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] + + /** Prunes bad plans to prevent com
spark git commit: [MINOR][X][X] Replace all occurrences of None: Option with Option.empty
Repository: spark Updated Branches: refs/heads/master 667d4ea7b -> 865ec32dd [MINOR][X][X] Replace all occurrences of None: Option with Option.empty ## What changes were proposed in this pull request? Replace all occurrences of `None: Option[X]` with `Option.empty[X]` ## How was this patch tested? Exisiting Tests Author: Sandeep Singh Closes #13591 from techaddict/minor-7. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865ec32d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865ec32d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865ec32d Branch: refs/heads/master Commit: 865ec32dd997e63aea01a871d1c7b4947f43c111 Parents: 667d4ea Author: Sandeep Singh Authored: Fri Jun 10 13:06:51 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 13:06:51 2016 -0700 -- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala| 4 ++-- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../sql/execution/command/createDataSourceTables.scala| 2 +- .../spark/sql/execution/exchange/ShuffleExchange.scala| 2 +- .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 10 +- .../spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/865ec32d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f924efe..3cc7a1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -105,7 +105,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ def find(f: BaseType => Boolean): Option[BaseType] = f(this) match { case true => Some(this) -case false => children.foldLeft(None: Option[BaseType]) { (l, r) => l.orElse(r.find(f)) } +case false => children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) } } /** @@ -165,7 +165,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] = { val lifted = pf.lift lifted(this).orElse { - children.foldLeft(None: Option[B]) { (l, r) => l.orElse(r.collectFirst(pf)) } + children.foldLeft(Option.empty[B]) { (l, r) => l.orElse(r.collectFirst(pf)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/865ec32d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 78b74f9..1c2003c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -503,7 +503,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def insertInto(tableIdent: TableIdentifier): Unit = { assertNotBucketed("insertInto") assertNotStreaming("insertInto() can only be called on non-continuous queries") -val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) +val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) val overwrite = mode == SaveMode.Overwrite df.sparkSession.sessionState.executePlan( http://git-wip-us.apache.org/repos/asf/spark/blob/865ec32d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 66753fa..865e406 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -169,7 +169,7 @@ case class CreateDataSourceTableAsSelectCommand( options } -var existingSchema = None: Option[StructType] +var existingSchema = Option.empty[StructType] if (sparkSession.sessionState.catalog.tableExists(tableIdent)) { // Check if we need to throw an exception or just return. mode match { http://git-wip-us.apache.org
spark git commit: [MINOR][X][X] Replace all occurrences of None: Option with Option.empty
Repository: spark Updated Branches: refs/heads/branch-2.0 f15d641e2 -> 96bb1476c [MINOR][X][X] Replace all occurrences of None: Option with Option.empty ## What changes were proposed in this pull request? Replace all occurrences of `None: Option[X]` with `Option.empty[X]` ## How was this patch tested? Exisiting Tests Author: Sandeep Singh Closes #13591 from techaddict/minor-7. (cherry picked from commit 865ec32dd997e63aea01a871d1c7b4947f43c111) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96bb1476 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96bb1476 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96bb1476 Branch: refs/heads/branch-2.0 Commit: 96bb1476ce884168f232d3e63aa21b5f7dba474f Parents: f15d641 Author: Sandeep Singh Authored: Fri Jun 10 13:06:51 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 13:06:57 2016 -0700 -- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala| 4 ++-- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../sql/execution/command/createDataSourceTables.scala| 2 +- .../spark/sql/execution/exchange/ShuffleExchange.scala| 2 +- .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 10 +- .../spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/96bb1476/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f924efe..3cc7a1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -105,7 +105,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ def find(f: BaseType => Boolean): Option[BaseType] = f(this) match { case true => Some(this) -case false => children.foldLeft(None: Option[BaseType]) { (l, r) => l.orElse(r.find(f)) } +case false => children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) } } /** @@ -165,7 +165,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] = { val lifted = pf.lift lifted(this).orElse { - children.foldLeft(None: Option[B]) { (l, r) => l.orElse(r.collectFirst(pf)) } + children.foldLeft(Option.empty[B]) { (l, r) => l.orElse(r.collectFirst(pf)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/96bb1476/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 78b74f9..1c2003c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -503,7 +503,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def insertInto(tableIdent: TableIdentifier): Unit = { assertNotBucketed("insertInto") assertNotStreaming("insertInto() can only be called on non-continuous queries") -val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) +val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) val overwrite = mode == SaveMode.Overwrite df.sparkSession.sessionState.executePlan( http://git-wip-us.apache.org/repos/asf/spark/blob/96bb1476/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 66753fa..865e406 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -169,7 +169,7 @@ case class CreateDataSourceTableAsSelectCommand( options } -var existingSchema = None: Option[StructType] +var existingSchema = Option.empty[StructType] if (sparkSession.sessionState.catalog.tableExists(tableIdent)) { /
spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`
Repository: spark Updated Branches: refs/heads/branch-2.0 c1390ccbb -> f15d641e2 [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter` ## What changes were proposed in this pull request? It doesn't make sense to specify partitioning parameters, when we write data out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s. This patch adds `assertNotPartitioned` check in `DataFrameWriter`. operation should check not partitioned? mode outputMode trigger format option/options partitionBy bucketBy sortBy save queryName startStream foreach yes insertInto saveAsTable jdbc yes json parquet orc text csv ## How was this patch tested? New dedicated tests. Author: Liwei Lin Closes #13597 from lw-lin/add-assertNotPartitioned. (cherry picked from commit fb219029dd1b8d2783c3e202361401048296595c) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15d641e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15d641e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15d641e Branch: refs/heads/branch-2.0 Commit: f15d641e297d425a8c1b4ba6c93f4f98a3f70d0f Parents: c1390cc Author: Liwei Lin Authored: Fri Jun 10 13:01:29 2016 -0700 Committer: Shixiong Zhu Committed: Fri Jun 10 13:01:37 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +- .../test/DataFrameReaderWriterSuite.scala | 42 ++-- .../spark/sql/sources/BucketedWriteSuite.scala | 8 ++-- 3 files changed, 52 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6ce59e8..78b74f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def foreach(writer: ForeachWriter[T]): ContinuousQuery = { +assertNotPartitioned("foreach") assertNotBucketed("foreach") assertStreaming( "foreach() can only be called on streaming Datasets/DataFrames.") @@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { - throw new IllegalArgumentException( -s"'$operation' does not support bucketing right now.") + throw new AnalysisException(s"'$operation' does not support bucketing right now") +} + } + + private def assertNotPartitioned(operation: String): Unit = { +if (partitioningColumns.isDefined) { + throw new AnalysisException( s"'$operation' does not support partitioning") } } @@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +assertNotPartitioned("jdbc") +assertNotBucketed("jdbc") assertNotStreaming("jdbc() can only be called on non-continuous queries") val props = new Properties() http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index bf6063a..6e0d66a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write -val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream()) -assert(e.getMessage == "'startStream' does not support
spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`
Repository: spark Updated Branches: refs/heads/master 5c16ad0d5 -> fb219029d [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter` ## What changes were proposed in this pull request? It doesn't make sense to specify partitioning parameters, when we write data out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s. This patch adds `assertNotPartitioned` check in `DataFrameWriter`. operation should check not partitioned? mode outputMode trigger format option/options partitionBy bucketBy sortBy save queryName startStream foreach yes insertInto saveAsTable jdbc yes json parquet orc text csv ## How was this patch tested? New dedicated tests. Author: Liwei Lin Closes #13597 from lw-lin/add-assertNotPartitioned. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb219029 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb219029 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb219029 Branch: refs/heads/master Commit: fb219029dd1b8d2783c3e202361401048296595c Parents: 5c16ad0 Author: Liwei Lin Authored: Fri Jun 10 13:01:29 2016 -0700 Committer: Shixiong Zhu Committed: Fri Jun 10 13:01:29 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +- .../test/DataFrameReaderWriterSuite.scala | 42 ++-- .../spark/sql/sources/BucketedWriteSuite.scala | 8 ++-- 3 files changed, 52 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6ce59e8..78b74f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def foreach(writer: ForeachWriter[T]): ContinuousQuery = { +assertNotPartitioned("foreach") assertNotBucketed("foreach") assertStreaming( "foreach() can only be called on streaming Datasets/DataFrames.") @@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { - throw new IllegalArgumentException( -s"'$operation' does not support bucketing right now.") + throw new AnalysisException(s"'$operation' does not support bucketing right now") +} + } + + private def assertNotPartitioned(operation: String): Unit = { +if (partitioningColumns.isDefined) { + throw new AnalysisException( s"'$operation' does not support partitioning") } } @@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +assertNotPartitioned("jdbc") +assertNotBucketed("jdbc") assertNotStreaming("jdbc() can only be called on non-continuous queries") val props = new Properties() http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index bf6063a..6e0d66a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write -val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream()) -assert(e.getMessage == "'startStream' does not support bucketing right now.") +val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream()) +
spark git commit: [SPARK-15766][SPARKR] R should export is.nan
Repository: spark Updated Branches: refs/heads/branch-2.0 8dd82f8de -> f895d6d85 [SPARK-15766][SPARKR] R should export is.nan ## What changes were proposed in this pull request? When reviewing SPARK-15545, we found that is.nan is not exported, which should be exported. Add it to the NAMESPACE. ## How was this patch tested? Manual tests. Author: wm...@hotmail.com Closes #13508 from wangmiao1981/unused. (cherry picked from commit 2c8f40cea113b597fbaf1cdd80a5b8bdd66155fb) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f895d6d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f895d6d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f895d6d8 Branch: refs/heads/branch-2.0 Commit: f895d6d859bc3b259abe8bc39cf8367e3e72a243 Parents: 8dd82f8 Author: wm...@hotmail.com Authored: Fri Jun 10 12:46:22 2016 -0700 Committer: Shivaram Venkataraman Committed: Fri Jun 10 12:46:31 2016 -0700 -- R/pkg/NAMESPACE | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f895d6d8/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 239ad06..ba386da 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -185,6 +185,8 @@ exportMethods("%in%", "isNaN", "isNotNull", "isNull", + "is.nan", + "isnan", "kurtosis", "lag", "last", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert [SPARK-14485][CORE] ignore task finished for executor lost
Repository: spark Updated Branches: refs/heads/branch-2.0 f895d6d85 -> c1390ccbb Revert [SPARK-14485][CORE] ignore task finished for executor lost This reverts commit 695dbc816a6d70289abeb145cb62ff4e62b3f49b. This change is being reverted because it hurts performance of some jobs, and only helps in a narrow set of cases. For more discussion, refer to the JIRA. Author: Kay Ousterhout Closes #13580 from kayousterhout/revert-SPARK-14485. (cherry picked from commit 5c16ad0d522e5124a6977533077afb7b38fc42a1) Signed-off-by: Kay Ousterhout Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1390ccb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1390ccb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1390ccb Branch: refs/heads/branch-2.0 Commit: c1390ccbb2968156245e267e6c5cd2a27f7d0121 Parents: f895d6d Author: Kay Ousterhout Authored: Fri Jun 10 12:50:27 2016 -0700 Committer: Kay Ousterhout Committed: Fri Jun 10 12:51:29 2016 -0700 -- .../apache/spark/scheduler/TaskSchedulerImpl.scala| 14 +- 1 file changed, 1 insertion(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1390ccb/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d6f58e4..01e85ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -346,11 +346,9 @@ private[spark] class TaskSchedulerImpl( } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => -var executorId: String = null if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => -executorId = execId if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } @@ -358,17 +356,7 @@ private[spark] class TaskSchedulerImpl( } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) - // In some case, executor has already been removed by driver for heartbeats timeout, - // but at sometime, before executor killed by cluster, the task of running on this - // executor is finished and return task success state to driver. However, this kinds - // of task should be ignored, because the task on this executor is already re-queued - // by driver. For more details, can check in SPARK-14485. - if (executorId != null && !executorIdToTaskCount.contains(executorId)) { -logInfo(s"Ignoring update with state $state for TID $tid because its executor " + - s"has already been removed by driver") - } else { -taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert [SPARK-14485][CORE] ignore task finished for executor lost
Repository: spark Updated Branches: refs/heads/master 2c8f40cea -> 5c16ad0d5 Revert [SPARK-14485][CORE] ignore task finished for executor lost This reverts commit 695dbc816a6d70289abeb145cb62ff4e62b3f49b. This change is being reverted because it hurts performance of some jobs, and only helps in a narrow set of cases. For more discussion, refer to the JIRA. Author: Kay Ousterhout Closes #13580 from kayousterhout/revert-SPARK-14485. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c16ad0d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c16ad0d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c16ad0d Branch: refs/heads/master Commit: 5c16ad0d522e5124a6977533077afb7b38fc42a1 Parents: 2c8f40c Author: Kay Ousterhout Authored: Fri Jun 10 12:50:27 2016 -0700 Committer: Kay Ousterhout Committed: Fri Jun 10 12:50:50 2016 -0700 -- .../apache/spark/scheduler/TaskSchedulerImpl.scala| 14 +- 1 file changed, 1 insertion(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ad0d/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8e1d957..c3adc28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -352,11 +352,9 @@ private[spark] class TaskSchedulerImpl( } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => -var executorId: String = null if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => -executorId = execId if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } @@ -364,17 +362,7 @@ private[spark] class TaskSchedulerImpl( } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) - // In some case, executor has already been removed by driver for heartbeats timeout, - // but at sometime, before executor killed by cluster, the task of running on this - // executor is finished and return task success state to driver. However, this kinds - // of task should be ignored, because the task on this executor is already re-queued - // by driver. For more details, can check in SPARK-14485. - if (executorId != null && !executorIdToTaskCount.contains(executorId)) { -logInfo(s"Ignoring update with state $state for TID $tid because its executor " + - s"has already been removed by driver") - } else { -taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15766][SPARKR] R should export is.nan
Repository: spark Updated Branches: refs/heads/master 2413fce9d -> 2c8f40cea [SPARK-15766][SPARKR] R should export is.nan ## What changes were proposed in this pull request? When reviewing SPARK-15545, we found that is.nan is not exported, which should be exported. Add it to the NAMESPACE. ## How was this patch tested? Manual tests. Author: wm...@hotmail.com Closes #13508 from wangmiao1981/unused. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c8f40ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c8f40ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c8f40ce Branch: refs/heads/master Commit: 2c8f40cea113b597fbaf1cdd80a5b8bdd66155fb Parents: 2413fce Author: wm...@hotmail.com Authored: Fri Jun 10 12:46:22 2016 -0700 Committer: Shivaram Venkataraman Committed: Fri Jun 10 12:46:22 2016 -0700 -- R/pkg/NAMESPACE | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c8f40ce/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 239ad06..ba386da 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -185,6 +185,8 @@ exportMethods("%in%", "isNaN", "isNotNull", "isNull", + "is.nan", + "isnan", "kurtosis", "lag", "last", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15743][SQL] Prevent saving with all-column partitioning
Repository: spark Updated Branches: refs/heads/master 7d7a0a5e0 -> 2413fce9d [SPARK-15743][SQL] Prevent saving with all-column partitioning ## What changes were proposed in this pull request? When saving datasets on storage, `partitionBy` provides an easy way to construct the directory structure. However, if a user choose all columns as partition columns, some exceptions occurs. - **ORC with all column partitioning**: `AnalysisException` on **future read** due to schema inference failure. ```scala scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data") scala> spark.read.format("orc").load("/tmp/data").collect() org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must be specified manually; ``` - **Parquet with all-column partitioning**: `InvalidSchemaException` on **write execution** due to Parquet limitation. ```scala scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data") [Stage 0:> (0 + 8) / 8]16/06/02 16:51:17 ERROR Utils: Aborting task org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema ... (lots of error messages) ``` Although some formats like JSON support all-column partitioning without any problem, it seems not a good idea to make lots of empty directories. This PR prevents saving with all-column partitioning by consistently raising `AnalysisException` before executing save operation. ## How was this patch tested? Newly added `PartitioningUtilsSuite`. Author: Dongjoon Hyun Closes #13486 from dongjoon-hyun/SPARK-15743. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2413fce9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2413fce9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2413fce9 Branch: refs/heads/master Commit: 2413fce9d6812a91eeffb4435c2b5b361d23214b Parents: 7d7a0a5 Author: Dongjoon Hyun Authored: Fri Jun 10 12:43:27 2016 -0700 Committer: Michael Armbrust Committed: Fri Jun 10 12:43:27 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 32 ++-- .../datasources/PartitioningUtils.scala | 8 +++-- .../spark/sql/execution/datasources/rules.scala | 4 +-- .../execution/streaming/FileStreamSink.scala| 2 +- .../test/DataFrameReaderWriterSuite.scala | 12 5 files changed, 37 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5f17fdf..d327302 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -*http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the L
spark git commit: [SPARK-15743][SQL] Prevent saving with all-column partitioning
Repository: spark Updated Branches: refs/heads/branch-2.0 935b6e0e4 -> 8dd82f8de [SPARK-15743][SQL] Prevent saving with all-column partitioning ## What changes were proposed in this pull request? When saving datasets on storage, `partitionBy` provides an easy way to construct the directory structure. However, if a user choose all columns as partition columns, some exceptions occurs. - **ORC with all column partitioning**: `AnalysisException` on **future read** due to schema inference failure. ```scala scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data") scala> spark.read.format("orc").load("/tmp/data").collect() org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must be specified manually; ``` - **Parquet with all-column partitioning**: `InvalidSchemaException` on **write execution** due to Parquet limitation. ```scala scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data") [Stage 0:> (0 + 8) / 8]16/06/02 16:51:17 ERROR Utils: Aborting task org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema ... (lots of error messages) ``` Although some formats like JSON support all-column partitioning without any problem, it seems not a good idea to make lots of empty directories. This PR prevents saving with all-column partitioning by consistently raising `AnalysisException` before executing save operation. ## How was this patch tested? Newly added `PartitioningUtilsSuite`. Author: Dongjoon Hyun Closes #13486 from dongjoon-hyun/SPARK-15743. (cherry picked from commit 2413fce9d6812a91eeffb4435c2b5b361d23214b) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8dd82f8d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8dd82f8d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8dd82f8d Branch: refs/heads/branch-2.0 Commit: 8dd82f8de40a9ef54ef147f1acfb54a40d270c67 Parents: 935b6e0 Author: Dongjoon Hyun Authored: Fri Jun 10 12:43:27 2016 -0700 Committer: Michael Armbrust Committed: Fri Jun 10 12:43:40 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 32 ++-- .../datasources/PartitioningUtils.scala | 8 +++-- .../spark/sql/execution/datasources/rules.scala | 4 +-- .../execution/streaming/FileStreamSink.scala| 2 +- .../test/DataFrameReaderWriterSuite.scala | 12 5 files changed, 37 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8dd82f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5f17fdf..d327302 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -*http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
spark git commit: [SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to Scala API
Repository: spark Updated Branches: refs/heads/master 254bc8c34 -> 7d7a0a5e0 [SPARK-15738][PYSPARK][ML] Adding Pyspark ml RFormula __str__ method similar to Scala API ## What changes were proposed in this pull request? Adding __str__ to RFormula and model that will show the set formula param and resolved formula. This is currently present in the Scala API, found missing in PySpark during Spark 2.0 coverage review. ## How was this patch tested? run pyspark-ml tests locally Author: Bryan Cutler Closes #13481 from BryanCutler/pyspark-ml-rformula_str-SPARK-15738. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d7a0a5e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d7a0a5e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d7a0a5e Branch: refs/heads/master Commit: 7d7a0a5e0749909e97d90188707cc9220a1bb73a Parents: 254bc8c Author: Bryan Cutler Authored: Fri Jun 10 11:27:30 2016 -0700 Committer: Yanbo Liang Committed: Fri Jun 10 11:27:30 2016 -0700 -- .../scala/org/apache/spark/ml/feature/RFormula.scala | 2 +- .../org/apache/spark/ml/feature/RFormulaParser.scala | 14 +- python/pyspark/ml/feature.py | 12 3 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d7a0a5e/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 2916b6d..a7ca0fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -182,7 +182,7 @@ class RFormula(override val uid: String) override def copy(extra: ParamMap): RFormula = defaultCopy(extra) - override def toString: String = s"RFormula(${get(formula)}) (uid=$uid)" + override def toString: String = s"RFormula(${get(formula).getOrElse("")}) (uid=$uid)" } @Since("2.0.0") http://git-wip-us.apache.org/repos/asf/spark/blob/7d7a0a5e/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala index 19aecff..2dd565a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala @@ -126,7 +126,19 @@ private[ml] case class ParsedRFormula(label: ColumnRef, terms: Seq[Term]) { * @param hasIntercept whether the formula specifies fitting with an intercept. */ private[ml] case class ResolvedRFormula( - label: String, terms: Seq[Seq[String]], hasIntercept: Boolean) + label: String, terms: Seq[Seq[String]], hasIntercept: Boolean) { + + override def toString: String = { +val ts = terms.map { + case t if t.length > 1 => +s"${t.mkString("{", ",", "}")}" + case t => +t.mkString +} +val termStr = ts.mkString("[", ",", "]") +s"ResolvedRFormula(label=$label, terms=$termStr, hasIntercept=$hasIntercept)" + } +} /** * R formula terms. See the R formula docs here for more information: http://git-wip-us.apache.org/repos/asf/spark/blob/7d7a0a5e/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index bfb2fb7..ca77ac3 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2528,6 +2528,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaM True >>> loadedRF.getLabelCol() == rf.getLabelCol() True +>>> str(loadedRF) +'RFormula(y ~ x + s) (uid=...)' >>> modelPath = temp_path + "/rFormulaModel" >>> model.save(modelPath) >>> loadedModel = RFormulaModel.load(modelPath) @@ -2542,6 +2544,8 @@ class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaM |0.0|0.0| a|[0.0,1.0]| 0.0| +---+---+---+-+-+ ... +>>> str(loadedModel) +'RFormulaModel(ResolvedRFormula(label=y, terms=[x,s], hasIntercept=true)) (uid=...)' .. versionadded:: 1.5.0 """ @@ -2586,6 +2590,10 @@ class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaM def _create_model(self, java_model): return RFormulaModel(java_model) +def __str__(self): +formulaStr = self.getFormula() if self.isDefined(self.formula) else "" +return "RFormula(%s) (uid=%s)" % (formulaStr, self.uid)
svn commit: r1747764 - in /spark: css/ site/ site/css/ site/images/
Author: matei Date: Fri Jun 10 18:15:52 2016 New Revision: 1747764 URL: http://svn.apache.org/viewvc?rev=1747764&view=rev Log: CSS tweaks Modified: spark/css/custom.css spark/site/css/custom.css spark/site/documentation.html spark/site/images/spark-logo-reverse.eps spark/site/images/spark-logo-trademark.png spark/site/images/spark-logo.eps spark/site/images/spark-logo.png spark/site/images/spark-runs-everywhere.png Modified: spark/css/custom.css URL: http://svn.apache.org/viewvc/spark/css/custom.css?rev=1747764&r1=1747763&r2=1747764&view=diff == --- spark/css/custom.css (original) +++ spark/css/custom.css Fri Jun 10 18:15:52 2016 @@ -1,7 +1,8 @@ .tagline { display: inline-block; color: rgb(47, 164, 231); - padding-bottom: 5px; + padding-bottom: 7px; + padding-left: 10px; font-style: italic; } @@ -204,6 +205,7 @@ footer a { display: inline-block; font-weight: 200; color: #333; + padding-left: 9px; } .subproject { @@ -212,7 +214,7 @@ footer a { @media (min-width: 768px) { .subproject { -font-size: 45px; +font-size: 46px; } } Modified: spark/site/css/custom.css URL: http://svn.apache.org/viewvc/spark/site/css/custom.css?rev=1747764&r1=1747763&r2=1747764&view=diff == --- spark/site/css/custom.css (original) +++ spark/site/css/custom.css Fri Jun 10 18:15:52 2016 @@ -1,7 +1,8 @@ .tagline { display: inline-block; color: rgb(47, 164, 231); - padding-bottom: 5px; + padding-bottom: 7px; + padding-left: 10px; font-style: italic; } @@ -204,6 +205,7 @@ footer a { display: inline-block; font-weight: 200; color: #333; + padding-left: 9px; } .subproject { @@ -212,7 +214,7 @@ footer a { @media (min-width: 768px) { .subproject { -font-size: 45px; +font-size: 46px; } } Modified: spark/site/documentation.html URL: http://svn.apache.org/viewvc/spark/site/documentation.html?rev=1747764&r1=1747763&r2=1747764&view=diff == --- spark/site/documentation.html (original) +++ spark/site/documentation.html Fri Jun 10 18:15:52 2016 @@ -249,13 +249,12 @@ Meetup Talk Videos -In addition to the videos listed below, you can also view http://www.meetup.com/spark-users/files/";>all slides from Bay Area meetups here. +In addition to the videos listed below, you can also view http://www.meetup.com/spark-users/files/";>all slides from Bay Area meetups here. .video-meta-info { font-size: 0.95em; } - - + http://www.youtube.com/watch?v=NUQ-8to2XAk&list=PL-x35fyliRwiP3YteXbnhk0QGOtYLBT3a";>Spark 1.0 and Beyond (http://files.meetup.com/3138542/Spark%201.0%20Meetup.ppt";>slides) by Patrick Wendell, at Cisco in San Jose, 2014-04-23 Modified: spark/site/images/spark-logo-reverse.eps URL: http://svn.apache.org/viewvc/spark/site/images/spark-logo-reverse.eps?rev=1747764&r1=1747763&r2=1747764&view=diff == Binary files - no diff available. Modified: spark/site/images/spark-logo-trademark.png URL: http://svn.apache.org/viewvc/spark/site/images/spark-logo-trademark.png?rev=1747764&r1=1747763&r2=1747764&view=diff == Binary files - no diff available. Modified: spark/site/images/spark-logo.eps URL: http://svn.apache.org/viewvc/spark/site/images/spark-logo.eps?rev=1747764&r1=1747763&r2=1747764&view=diff == Binary files - no diff available. Modified: spark/site/images/spark-logo.png URL: http://svn.apache.org/viewvc/spark/site/images/spark-logo.png?rev=1747764&r1=1747763&r2=1747764&view=diff == Binary files - no diff available. Modified: spark/site/images/spark-runs-everywhere.png URL: http://svn.apache.org/viewvc/spark/site/images/spark-runs-everywhere.png?rev=1747764&r1=1747763&r2=1747764&view=diff == Binary files - no diff available. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r1747763 - in /spark/images: spark-logo-reverse.eps spark-logo-trademark.png spark-logo.eps spark-logo.png spark-runs-everywhere.png
Author: matei Date: Fri Jun 10 18:15:35 2016 New Revision: 1747763 URL: http://svn.apache.org/viewvc?rev=1747763&view=rev Log: Version of logo with Apache Modified: spark/images/spark-logo-reverse.eps spark/images/spark-logo-trademark.png spark/images/spark-logo.eps spark/images/spark-logo.png spark/images/spark-runs-everywhere.png Modified: spark/images/spark-logo-reverse.eps URL: http://svn.apache.org/viewvc/spark/images/spark-logo-reverse.eps?rev=1747763&r1=1747762&r2=1747763&view=diff == Binary files - no diff available. Modified: spark/images/spark-logo-trademark.png URL: http://svn.apache.org/viewvc/spark/images/spark-logo-trademark.png?rev=1747763&r1=1747762&r2=1747763&view=diff == Binary files - no diff available. Modified: spark/images/spark-logo.eps URL: http://svn.apache.org/viewvc/spark/images/spark-logo.eps?rev=1747763&r1=1747762&r2=1747763&view=diff == Binary files - no diff available. Modified: spark/images/spark-logo.png URL: http://svn.apache.org/viewvc/spark/images/spark-logo.png?rev=1747763&r1=1747762&r2=1747763&view=diff == Binary files - no diff available. Modified: spark/images/spark-runs-everywhere.png URL: http://svn.apache.org/viewvc/spark/images/spark-runs-everywhere.png?rev=1747763&r1=1747762&r2=1747763&view=diff == Binary files - no diff available. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter
Repository: spark Updated Branches: refs/heads/branch-2.0 47c2a265f -> 55a837246 [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter ## What changes were proposed in this pull request? This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #13496 from viirya/move-analyzer-stuff. (cherry picked from commit 0ec279ffdf92853965e327a9f0f6956cacb7a23e) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55a83724 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55a83724 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55a83724 Branch: refs/heads/branch-2.0 Commit: 55a83724632aa54e49aedbab8ddd21d010eca26d Parents: 47c2a26 Author: Liang-Chi Hsieh Authored: Fri Jun 10 11:05:04 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 11:05:14 2016 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++--- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +--- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4446140..a081357 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,6 +452,17 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +// A partitioned relation's schema can be different from the input logicalPlan, since +// partition columns are all moved after data columns. We Project to adjust the ordering. +val input = if (parts.nonEmpty) { + val (inputPartCols, inputDataCols) = child.output.partition { attr => +parts.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, child) +} else { + child +} + val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -467,8 +478,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Assume partition columns are correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table)) + // Partition columns are already correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table), child = input) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -486,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => -i.copy(table = EliminateSubqueryAliases(table)) +i.copy(table = EliminateSubqueryAliases(table), child = input) } case u: UnresolvedRelation => val table = u.tableIdentifier http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 32e2fdc..6ce59e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite -// A partitioned relation's schema can be different from the input logicalPlan, since -// partition columns are all moved after data columns. We Project to adjust the ordering. -// TODO: this belongs to the analyzer. -val input = normalizedParCols.map {
spark git commit: [SPARK-15866] Rename listAccumulator collectionAccumulator
Repository: spark Updated Branches: refs/heads/master 0ec279ffd -> 254bc8c34 [SPARK-15866] Rename listAccumulator collectionAccumulator ## What changes were proposed in this pull request? SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" is a verb and the method should return a list of accumulators. This patch renames the method and the class collection accumulator. ## How was this patch tested? Updated test case to reflect the names. Author: Reynold Xin Closes #13594 from rxin/SPARK-15866. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/254bc8c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/254bc8c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/254bc8c3 Branch: refs/heads/master Commit: 254bc8c34e70241508bdfc8ff42a65491f5280cd Parents: 0ec279f Author: Reynold Xin Authored: Fri Jun 10 11:08:39 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 11:08:39 2016 -0700 -- .../main/scala/org/apache/spark/SparkContext.scala | 16 .../scala/org/apache/spark/util/AccumulatorV2.scala | 15 ++- .../org/apache/spark/util/AccumulatorV2Suite.scala | 2 +- .../execution/columnar/InMemoryTableScanExec.scala | 8 4 files changed, 23 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/254bc8c3/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 33b11ed..230fabd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1340,21 +1340,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * Create and register a list accumulator, which starts with empty list and accumulates inputs - * by adding them into the inner list. + * Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates + * inputs by adding them into the list. */ - def listAccumulator[T]: ListAccumulator[T] = { -val acc = new ListAccumulator[T] + def collectionAccumulator[T]: CollectionAccumulator[T] = { +val acc = new CollectionAccumulator[T] register(acc) acc } /** - * Create and register a list accumulator, which starts with empty list and accumulates inputs - * by adding them into the inner list. + * Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates + * inputs by adding them into the list. */ - def listAccumulator[T](name: String): ListAccumulator[T] = { -val acc = new ListAccumulator[T] + def collectionAccumulator[T](name: String): CollectionAccumulator[T] = { +val acc = new CollectionAccumulator[T] register(acc, name) acc } http://git-wip-us.apache.org/repos/asf/spark/blob/254bc8c3/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0b9a47c..044dd69 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -415,15 +415,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { } -class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { +/** + * An [[AccumulatorV2 accumulator]] for collecting a list of elements. + * + * @since 2.0.0 + */ +class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { private val _list: java.util.List[T] = new ArrayList[T] override def isZero: Boolean = _list.isEmpty - override def copyAndReset(): ListAccumulator[T] = new ListAccumulator + override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator - override def copy(): ListAccumulator[T] = { -val newAcc = new ListAccumulator[T] + override def copy(): CollectionAccumulator[T] = { +val newAcc = new CollectionAccumulator[T] newAcc._list.addAll(_list) newAcc } @@ -433,7 +438,7 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { override def add(v: T): Unit = _list.add(v) override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { -case o: ListAccumulator[T] => _list.addAll(o.value) +case o: CollectionAccumulator[T] => _list.addAll(o.value) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.ge
spark git commit: [SPARK-15866] Rename listAccumulator collectionAccumulator
Repository: spark Updated Branches: refs/heads/branch-2.0 55a837246 -> 935b6e0e4 [SPARK-15866] Rename listAccumulator collectionAccumulator ## What changes were proposed in this pull request? SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" is a verb and the method should return a list of accumulators. This patch renames the method and the class collection accumulator. ## How was this patch tested? Updated test case to reflect the names. Author: Reynold Xin Closes #13594 from rxin/SPARK-15866. (cherry picked from commit 254bc8c34e70241508bdfc8ff42a65491f5280cd) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/935b6e0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/935b6e0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/935b6e0e Branch: refs/heads/branch-2.0 Commit: 935b6e0e48e258f447622033b512f7ba5d83da69 Parents: 55a8372 Author: Reynold Xin Authored: Fri Jun 10 11:08:39 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 11:08:47 2016 -0700 -- .../main/scala/org/apache/spark/SparkContext.scala | 16 .../scala/org/apache/spark/util/AccumulatorV2.scala | 15 ++- .../org/apache/spark/util/AccumulatorV2Suite.scala | 2 +- .../execution/columnar/InMemoryTableScanExec.scala | 8 4 files changed, 23 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 33b11ed..230fabd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1340,21 +1340,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * Create and register a list accumulator, which starts with empty list and accumulates inputs - * by adding them into the inner list. + * Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates + * inputs by adding them into the list. */ - def listAccumulator[T]: ListAccumulator[T] = { -val acc = new ListAccumulator[T] + def collectionAccumulator[T]: CollectionAccumulator[T] = { +val acc = new CollectionAccumulator[T] register(acc) acc } /** - * Create and register a list accumulator, which starts with empty list and accumulates inputs - * by adding them into the inner list. + * Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates + * inputs by adding them into the list. */ - def listAccumulator[T](name: String): ListAccumulator[T] = { -val acc = new ListAccumulator[T] + def collectionAccumulator[T](name: String): CollectionAccumulator[T] = { +val acc = new CollectionAccumulator[T] register(acc, name) acc } http://git-wip-us.apache.org/repos/asf/spark/blob/935b6e0e/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0b9a47c..044dd69 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -415,15 +415,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { } -class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { +/** + * An [[AccumulatorV2 accumulator]] for collecting a list of elements. + * + * @since 2.0.0 + */ +class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { private val _list: java.util.List[T] = new ArrayList[T] override def isZero: Boolean = _list.isEmpty - override def copyAndReset(): ListAccumulator[T] = new ListAccumulator + override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator - override def copy(): ListAccumulator[T] = { -val newAcc = new ListAccumulator[T] + override def copy(): CollectionAccumulator[T] = { +val newAcc = new CollectionAccumulator[T] newAcc._list.addAll(_list) newAcc } @@ -433,7 +438,7 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { override def add(v: T): Unit = _list.add(v) override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { -case o: ListAccumulator[T] => _list.addAll(o.value) +case o: CollectionAccumulator[T] => _list.addAll(o.value) case _ => throw
spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter
Repository: spark Updated Branches: refs/heads/master abdb5d42c -> 0ec279ffd [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter ## What changes were proposed in this pull request? This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #13496 from viirya/move-analyzer-stuff. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ec279ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ec279ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ec279ff Branch: refs/heads/master Commit: 0ec279ffdf92853965e327a9f0f6956cacb7a23e Parents: abdb5d4 Author: Liang-Chi Hsieh Authored: Fri Jun 10 11:05:04 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 11:05:04 2016 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++--- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +--- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1ca99f..58f3904 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,6 +452,17 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +// A partitioned relation's schema can be different from the input logicalPlan, since +// partition columns are all moved after data columns. We Project to adjust the ordering. +val input = if (parts.nonEmpty) { + val (inputPartCols, inputDataCols) = child.output.partition { attr => +parts.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, child) +} else { + child +} + val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -467,8 +478,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Assume partition columns are correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table)) + // Partition columns are already correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table), child = input) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -486,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => -i.copy(table = EliminateSubqueryAliases(table)) +i.copy(table = EliminateSubqueryAliases(table), child = input) } case u: UnresolvedRelation => val table = u.tableIdentifier http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 32e2fdc..6ce59e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite -// A partitioned relation's schema can be different from the input logicalPlan, since -// partition columns are all moved after data columns. We Project to adjust the ordering. -// TODO: this belongs to the analyzer. -val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => -
spark git commit: [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode
Repository: spark Updated Branches: refs/heads/branch-2.0 54b4763d2 -> 47c2a265f [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode ## What changes were proposed in this pull request? When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later. ## How was this patch tested? Additional unit tests. Author: Tathagata Das Closes #13549 from tdas/SPARK-15812. (cherry picked from commit abdb5d42c5802c8f60876aa1285c803d02881258) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47c2a265 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47c2a265 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47c2a265 Branch: refs/heads/branch-2.0 Commit: 47c2a265fbdb91cf5684f0d6342869ca08cb2d27 Parents: 54b4763 Author: Tathagata Das Authored: Fri Jun 10 10:48:28 2016 -0700 Committer: Tathagata Das Committed: Fri Jun 10 10:48:38 2016 -0700 -- .../analysis/UnsupportedOperationChecker.scala | 61 .../analysis/UnsupportedOperationsSuite.scala | 17 +- .../apache/spark/sql/streaming/StreamTest.scala | 24 +--- .../streaming/StreamingAggregationSuite.scala | 25 4 files changed, 95 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47c2a265/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8373fa3..689e016 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -43,6 +43,41 @@ object UnsupportedOperationChecker { "Queries without streaming sources cannot be executed with write.startStream()")(plan) } +// Disallow multiple streaming aggregations +val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + +if (aggregates.size > 1) { + throwError( +"Multiple streaming aggregations are not supported with " + + "streaming DataFrames/Datasets")(plan) +} + +// Disallow some output mode +outputMode match { + case InternalOutputModes.Append if aggregates.nonEmpty => +throwError( + s"$outputMode output mode not supported when there are streaming aggregations on " + +s"streaming DataFrames/DataSets")(plan) + + case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => +throwError( + s"$outputMode output mode not supported when there are no streaming aggregations on " + +s"streaming DataFrames/Datasets")(plan) + + case _ => +} + +/** + * Whether the subplan will contain complete data or incremental data in every incremental + * execution. Some operations may be allowed only when the child logical plan gives complete + * data. + */ +def containsCompleteData(subplan: LogicalPlan): Boolean = { + val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + // Either the subplan has no streaming source, or it has aggregation with Complete mode + !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) +} + plan.foreachUp { implicit subPlan => // Operations that cannot exists anywhere in a streaming plan @@ -107,8 +142,9 @@ object UnsupportedOperationChecker { case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Limits are not supported on streaming DataFrames/Datasets") -case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) => - throwError("Sorting is not supported on streaming DataFrames/Datasets") +case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => + throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + +"aggregated DataFrame/Dataset in Complete mode") case Sample(_, _, _, _
spark git commit: [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode
Repository: spark Updated Branches: refs/heads/master cdd7f5a57 -> abdb5d42c [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode ## What changes were proposed in this pull request? When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later. ## How was this patch tested? Additional unit tests. Author: Tathagata Das Closes #13549 from tdas/SPARK-15812. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abdb5d42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abdb5d42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abdb5d42 Branch: refs/heads/master Commit: abdb5d42c5802c8f60876aa1285c803d02881258 Parents: cdd7f5a Author: Tathagata Das Authored: Fri Jun 10 10:48:28 2016 -0700 Committer: Tathagata Das Committed: Fri Jun 10 10:48:28 2016 -0700 -- .../analysis/UnsupportedOperationChecker.scala | 61 .../analysis/UnsupportedOperationsSuite.scala | 17 +- .../apache/spark/sql/streaming/StreamTest.scala | 24 +--- .../streaming/StreamingAggregationSuite.scala | 25 4 files changed, 95 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/abdb5d42/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8373fa3..689e016 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -43,6 +43,41 @@ object UnsupportedOperationChecker { "Queries without streaming sources cannot be executed with write.startStream()")(plan) } +// Disallow multiple streaming aggregations +val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + +if (aggregates.size > 1) { + throwError( +"Multiple streaming aggregations are not supported with " + + "streaming DataFrames/Datasets")(plan) +} + +// Disallow some output mode +outputMode match { + case InternalOutputModes.Append if aggregates.nonEmpty => +throwError( + s"$outputMode output mode not supported when there are streaming aggregations on " + +s"streaming DataFrames/DataSets")(plan) + + case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => +throwError( + s"$outputMode output mode not supported when there are no streaming aggregations on " + +s"streaming DataFrames/Datasets")(plan) + + case _ => +} + +/** + * Whether the subplan will contain complete data or incremental data in every incremental + * execution. Some operations may be allowed only when the child logical plan gives complete + * data. + */ +def containsCompleteData(subplan: LogicalPlan): Boolean = { + val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + // Either the subplan has no streaming source, or it has aggregation with Complete mode + !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) +} + plan.foreachUp { implicit subPlan => // Operations that cannot exists anywhere in a streaming plan @@ -107,8 +142,9 @@ object UnsupportedOperationChecker { case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Limits are not supported on streaming DataFrames/Datasets") -case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) => - throwError("Sorting is not supported on streaming DataFrames/Datasets") +case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => + throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + +"aggregated DataFrame/Dataset in Complete mode") case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/D
spark git commit: [SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter
Repository: spark Updated Branches: refs/heads/branch-2.0 6709ce1ae -> 54b4763d2 [SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter ## What changes were proposed in this pull request? Word2vec python add maxsentence parameter. ## How was this patch tested? Existing test. Author: WeichenXu Closes #13578 from WeichenXu123/word2vec_python_add_maxsentence. (cherry picked from commit cdd7f5a57a21d4a8f93456d149f65859c96190cf) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54b4763d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54b4763d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54b4763d Branch: refs/heads/branch-2.0 Commit: 54b4763d295d6aeab6105d0430470343dd4ca3a3 Parents: 6709ce1 Author: WeichenXu Authored: Fri Jun 10 12:26:53 2016 +0100 Committer: Sean Owen Committed: Fri Jun 10 12:27:04 2016 +0100 -- python/pyspark/ml/feature.py | 29 - 1 file changed, 24 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54b4763d/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index ebe1300..bfb2fb7 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2244,28 +2244,33 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has windowSize = Param(Params._dummy(), "windowSize", "the window size (context words from [-window, window]). Default value is 5", typeConverter=TypeConverters.toInt) +maxSentenceLength = Param(Params._dummy(), "maxSentenceLength", + "Maximum length (in words) of each sentence in the input data. " + + "Any sentence longer than this threshold will " + + "be divided into chunks up to the size.", + typeConverter=TypeConverters.toInt) @keyword_only def __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, - seed=None, inputCol=None, outputCol=None, windowSize=5): + seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000): """ __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, \ - seed=None, inputCol=None, outputCol=None, windowSize=5) + seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000) """ super(Word2Vec, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Word2Vec", self.uid) self._setDefault(vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, - seed=None, windowSize=5) + seed=None, windowSize=5, maxSentenceLength=1000) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only @since("1.4.0") def setParams(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, - seed=None, inputCol=None, outputCol=None, windowSize=5): + seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000): """ setParams(self, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=None, \ - inputCol=None, outputCol=None, windowSize=5) + inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000) Sets params for this Word2Vec. """ kwargs = self.setParams._input_kwargs @@ -2327,6 +2332,20 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has """ return self.getOrDefault(self.windowSize) +@since("2.0.0") +def setMaxSentenceLength(self, value): +""" +Sets the value of :py:attr:`maxSentenceLength`. +""" +return self._set(maxSentenceLength=value) + +@since("2.0.0") +def getMaxSentenceLength(self): +""" +Gets the value of maxSentenceLength or its default value. +""" +return self.getOrDefault(self.maxSentenceLength) + def _create_model(self, java_model): return Word2VecModel(java_model) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter
Repository: spark Updated Branches: refs/heads/master 16ca32eac -> cdd7f5a57 [SPARK-15837][ML][PYSPARK] Word2vec python add maxsentence parameter ## What changes were proposed in this pull request? Word2vec python add maxsentence parameter. ## How was this patch tested? Existing test. Author: WeichenXu Closes #13578 from WeichenXu123/word2vec_python_add_maxsentence. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdd7f5a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdd7f5a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdd7f5a5 Branch: refs/heads/master Commit: cdd7f5a57a21d4a8f93456d149f65859c96190cf Parents: 16ca32e Author: WeichenXu Authored: Fri Jun 10 12:26:53 2016 +0100 Committer: Sean Owen Committed: Fri Jun 10 12:26:53 2016 +0100 -- python/pyspark/ml/feature.py | 29 - 1 file changed, 24 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cdd7f5a5/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index ebe1300..bfb2fb7 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2244,28 +2244,33 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has windowSize = Param(Params._dummy(), "windowSize", "the window size (context words from [-window, window]). Default value is 5", typeConverter=TypeConverters.toInt) +maxSentenceLength = Param(Params._dummy(), "maxSentenceLength", + "Maximum length (in words) of each sentence in the input data. " + + "Any sentence longer than this threshold will " + + "be divided into chunks up to the size.", + typeConverter=TypeConverters.toInt) @keyword_only def __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, - seed=None, inputCol=None, outputCol=None, windowSize=5): + seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000): """ __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, \ - seed=None, inputCol=None, outputCol=None, windowSize=5) + seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000) """ super(Word2Vec, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Word2Vec", self.uid) self._setDefault(vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, - seed=None, windowSize=5) + seed=None, windowSize=5, maxSentenceLength=1000) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only @since("1.4.0") def setParams(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, - seed=None, inputCol=None, outputCol=None, windowSize=5): + seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000): """ setParams(self, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=None, \ - inputCol=None, outputCol=None, windowSize=5) + inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000) Sets params for this Word2Vec. """ kwargs = self.setParams._input_kwargs @@ -2327,6 +2332,20 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has """ return self.getOrDefault(self.windowSize) +@since("2.0.0") +def setMaxSentenceLength(self, value): +""" +Sets the value of :py:attr:`maxSentenceLength`. +""" +return self._set(maxSentenceLength=value) + +@since("2.0.0") +def getMaxSentenceLength(self): +""" +Gets the value of maxSentenceLength or its default value. +""" +return self.getOrDefault(self.maxSentenceLength) + def _create_model(self, java_model): return Word2VecModel(java_model) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics
Repository: spark Updated Branches: refs/heads/branch-2.0 84a8421e5 -> 6709ce1ae [SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics ## What changes were proposed in this pull request? `accuracy` should be decorated with `property` to keep step with other methods in `pyspark.MulticlassMetrics`, like `weightedPrecision`, `weightedRecall`, etc ## How was this patch tested? manual tests Author: Zheng RuiFeng Closes #13560 from zhengruifeng/add_accuracy_property. (cherry picked from commit 16ca32eace39c423224b0ec25922038fd45c501a) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6709ce1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6709ce1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6709ce1a Branch: refs/heads/branch-2.0 Commit: 6709ce1aea4a8d7438722f48fd7f2ed0fc7fa5be Parents: 84a8421 Author: Zheng RuiFeng Authored: Fri Jun 10 10:09:19 2016 +0100 Committer: Sean Owen Committed: Fri Jun 10 10:09:29 2016 +0100 -- python/pyspark/mllib/evaluation.py | 7 ++- 1 file changed, 2 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6709ce1a/python/pyspark/mllib/evaluation.py -- diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index 2eaac87..fc2a0b3 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -179,11 +179,7 @@ class MulticlassMetrics(JavaModelWrapper): 1.0... >>> metrics.fMeasure(0.0, 2.0) 0.52... ->>> metrics.precision() -0.66... ->>> metrics.recall() -0.66... ->>> metrics.accuracy() +>>> metrics.accuracy 0.66... >>> metrics.weightedFalsePositiveRate 0.19... @@ -273,6 +269,7 @@ class MulticlassMetrics(JavaModelWrapper): else: return self.call("fMeasure", label, beta) +@property @since('2.0.0') def accuracy(self): """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics
Repository: spark Updated Branches: refs/heads/master 675a73715 -> 16ca32eac [SPARK-15823][PYSPARK][ML] Add @property for 'accuracy' in MulticlassMetrics ## What changes were proposed in this pull request? `accuracy` should be decorated with `property` to keep step with other methods in `pyspark.MulticlassMetrics`, like `weightedPrecision`, `weightedRecall`, etc ## How was this patch tested? manual tests Author: Zheng RuiFeng Closes #13560 from zhengruifeng/add_accuracy_property. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16ca32ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16ca32ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16ca32ea Branch: refs/heads/master Commit: 16ca32eace39c423224b0ec25922038fd45c501a Parents: 675a737 Author: Zheng RuiFeng Authored: Fri Jun 10 10:09:19 2016 +0100 Committer: Sean Owen Committed: Fri Jun 10 10:09:19 2016 +0100 -- python/pyspark/mllib/evaluation.py | 7 ++- 1 file changed, 2 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16ca32ea/python/pyspark/mllib/evaluation.py -- diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index 2eaac87..fc2a0b3 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -179,11 +179,7 @@ class MulticlassMetrics(JavaModelWrapper): 1.0... >>> metrics.fMeasure(0.0, 2.0) 0.52... ->>> metrics.precision() -0.66... ->>> metrics.recall() -0.66... ->>> metrics.accuracy() +>>> metrics.accuracy 0.66... >>> metrics.weightedFalsePositiveRate 0.19... @@ -273,6 +269,7 @@ class MulticlassMetrics(JavaModelWrapper): else: return self.call("fMeasure", label, beta) +@property @since('2.0.0') def accuracy(self): """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCUMENTATION] fixed groupby aggregation example for pyspark
Repository: spark Updated Branches: refs/heads/master 00c310133 -> 675a73715 [DOCUMENTATION] fixed groupby aggregation example for pyspark ## What changes were proposed in this pull request? fixing documentation for the groupby/agg example in python ## How was this patch tested? the existing example in the documentation dose not contain valid syntax (missing parenthesis) and is not using `Column` in the expression for `agg()` after the fix here's how I tested it: ``` In [1]: from pyspark.sql import Row In [2]: import pyspark.sql.functions as func In [3]: %cpaste Pasting code; enter '--' alone on the line to stop or use Ctrl-D. :records = [{'age': 19, 'department': 1, 'expense': 100}, : {'age': 20, 'department': 1, 'expense': 200}, : {'age': 21, 'department': 2, 'expense': 300}, : {'age': 22, 'department': 2, 'expense': 300}, : {'age': 23, 'department': 3, 'expense': 300}] :-- In [4]: df = sqlContext.createDataFrame([Row(**d) for d in records]) In [5]: df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")).show() +--+--+++ |department|department|max(age)|sum(expense)| +--+--+++ | 1| 1| 20| 300| | 2| 2| 22| 600| | 3| 3| 23| 300| +--+--+++ Author: Mortada Mehyar Closes #13587 from mortada/groupby_agg_doc_fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/675a7371 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/675a7371 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/675a7371 Branch: refs/heads/master Commit: 675a73715d3c8adb9d9a9dce5f76a2db5106790c Parents: 00c3101 Author: Mortada Mehyar Authored: Fri Jun 10 00:23:34 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 00:23:34 2016 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/675a7371/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 940c1d7..efdf873 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2221,7 +2221,7 @@ import pyspark.sql.functions as func # In 1.3.x, in order for the grouping column "department" to show up, # it must be included explicitly as part of the agg function call. -df.groupBy("department").agg("department"), func.max("age"), func.sum("expense")) +df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(func.max("age"), func.sum("expense")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCUMENTATION] fixed groupby aggregation example for pyspark
Repository: spark Updated Branches: refs/heads/branch-2.0 02ed7b536 -> 84a8421e5 [DOCUMENTATION] fixed groupby aggregation example for pyspark ## What changes were proposed in this pull request? fixing documentation for the groupby/agg example in python ## How was this patch tested? the existing example in the documentation dose not contain valid syntax (missing parenthesis) and is not using `Column` in the expression for `agg()` after the fix here's how I tested it: ``` In [1]: from pyspark.sql import Row In [2]: import pyspark.sql.functions as func In [3]: %cpaste Pasting code; enter '--' alone on the line to stop or use Ctrl-D. :records = [{'age': 19, 'department': 1, 'expense': 100}, : {'age': 20, 'department': 1, 'expense': 200}, : {'age': 21, 'department': 2, 'expense': 300}, : {'age': 22, 'department': 2, 'expense': 300}, : {'age': 23, 'department': 3, 'expense': 300}] :-- In [4]: df = sqlContext.createDataFrame([Row(**d) for d in records]) In [5]: df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")).show() +--+--+++ |department|department|max(age)|sum(expense)| +--+--+++ | 1| 1| 20| 300| | 2| 2| 22| 600| | 3| 3| 23| 300| +--+--+++ Author: Mortada Mehyar Closes #13587 from mortada/groupby_agg_doc_fix. (cherry picked from commit 675a73715d3c8adb9d9a9dce5f76a2db5106790c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84a8421e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84a8421e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84a8421e Branch: refs/heads/branch-2.0 Commit: 84a8421e5cd5756cffb3d796117149c413204264 Parents: 02ed7b5 Author: Mortada Mehyar Authored: Fri Jun 10 00:23:34 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 00:23:41 2016 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84a8421e/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 940c1d7..efdf873 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2221,7 +2221,7 @@ import pyspark.sql.functions as func # In 1.3.x, in order for the grouping column "department" to show up, # it must be included explicitly as part of the agg function call. -df.groupBy("department").agg("department"), func.max("age"), func.sum("expense")) +df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(func.max("age"), func.sum("expense")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCUMENTATION] fixed groupby aggregation example for pyspark
Repository: spark Updated Branches: refs/heads/branch-1.6 739d992f0 -> 393f4ba15 [DOCUMENTATION] fixed groupby aggregation example for pyspark ## What changes were proposed in this pull request? fixing documentation for the groupby/agg example in python ## How was this patch tested? the existing example in the documentation dose not contain valid syntax (missing parenthesis) and is not using `Column` in the expression for `agg()` after the fix here's how I tested it: ``` In [1]: from pyspark.sql import Row In [2]: import pyspark.sql.functions as func In [3]: %cpaste Pasting code; enter '--' alone on the line to stop or use Ctrl-D. :records = [{'age': 19, 'department': 1, 'expense': 100}, : {'age': 20, 'department': 1, 'expense': 200}, : {'age': 21, 'department': 2, 'expense': 300}, : {'age': 22, 'department': 2, 'expense': 300}, : {'age': 23, 'department': 3, 'expense': 300}] :-- In [4]: df = sqlContext.createDataFrame([Row(**d) for d in records]) In [5]: df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")).show() +--+--+++ |department|department|max(age)|sum(expense)| +--+--+++ | 1| 1| 20| 300| | 2| 2| 22| 600| | 3| 3| 23| 300| +--+--+++ Author: Mortada Mehyar Closes #13587 from mortada/groupby_agg_doc_fix. (cherry picked from commit 675a73715d3c8adb9d9a9dce5f76a2db5106790c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/393f4ba1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/393f4ba1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/393f4ba1 Branch: refs/heads/branch-1.6 Commit: 393f4ba1516af47388e72310aee8dbbea9652134 Parents: 739d992 Author: Mortada Mehyar Authored: Fri Jun 10 00:23:34 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 00:23:49 2016 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/393f4ba1/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 803701e..26511b5 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2248,7 +2248,7 @@ import pyspark.sql.functions as func # In 1.3.x, in order for the grouping column "department" to show up, # it must be included explicitly as part of the agg function call. -df.groupBy("department").agg("department"), func.max("age"), func.sum("expense")) +df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(func.max("age"), func.sum("expense")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
Repository: spark Updated Branches: refs/heads/branch-2.0 1371d5ece -> 02ed7b536 [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery ## What changes were proposed in this pull request? * Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery * ForeachWriter is the interface for the user to consume partitions of data * Add a type parameter T to DataFrameWriter Usage ```Scala val ds = spark.readstream().as[String] ds.write .queryName(...) .option("checkpointLocation", ...) .foreach(new ForeachWriter[Int] { def open(partitionId: Long, version: Long): Boolean = { // prepare some resources for a partition // check `version` if possible and return `false` if this is a duplicated data to skip the data processing. } override def process(value: Int): Unit = { // process data } def close(errorOrNull: Throwable): Unit = { // release resources for a partition // check `errorOrNull` and handle the error if necessary. } }) ``` ## How was this patch tested? New unit tests. Author: Shixiong Zhu Closes #13342 from zsxwing/foreach. (cherry picked from commit 00c310133df4f3893dd90d801168c2ab9841b102) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02ed7b53 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02ed7b53 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02ed7b53 Branch: refs/heads/branch-2.0 Commit: 02ed7b536f54f46de9b10a8a4ea79544a7a813bf Parents: 1371d5e Author: Shixiong Zhu Authored: Fri Jun 10 00:11:46 2016 -0700 Committer: Tathagata Das Committed: Fri Jun 10 00:11:56 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 150 ++- .../scala/org/apache/spark/sql/Dataset.scala| 2 +- .../org/apache/spark/sql/ForeachWriter.scala| 105 + .../sql/execution/streaming/ForeachSink.scala | 53 +++ .../execution/streaming/ForeachSinkSuite.scala | 141 + .../spark/sql/sources/BucketedReadSuite.scala | 4 +- 6 files changed, 413 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/02ed7b53/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1dd8818..32e2fdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Utils @@ -40,7 +40,9 @@ import org.apache.spark.util.Utils * * @since 1.4.0 */ -final class DataFrameWriter private[sql](df: DataFrame) { +final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() /** * Specifies the behavior when data or table already exists. Options include: @@ -51,7 +53,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * @since 1.4.0 */ - def mode(saveMode: SaveMode): DataFrameWriter = { + def mode(saveMode: SaveMode): DataFrameWriter[T] = { // mode() is used for non-continuous queries // outputMode() is used for continuous queries assertNotStreaming("mode() can only be called on non-continuous queries") @@ -68,7 +70,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * @since 1.4.0 */ - def mode(saveMode: String): DataFrameWriter = { + def mode(saveMode: String): DataFrameWriter[T] = { // mode() is used for non-continuous queries // outputMode() is used for continuous queries assertNotStreaming("mode() can only be called on non-continuous queries") @@ -93,7 +95,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 2.0.0 */ @Experimental - def outputMode(outputMode: OutputMode): DataFrameWriter = { + def outputMode(outputMode: OutputMod
spark git commit: [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
Repository: spark Updated Branches: refs/heads/master 5a3533e77 -> 00c310133 [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery ## What changes were proposed in this pull request? * Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery * ForeachWriter is the interface for the user to consume partitions of data * Add a type parameter T to DataFrameWriter Usage ```Scala val ds = spark.readstream().as[String] ds.write .queryName(...) .option("checkpointLocation", ...) .foreach(new ForeachWriter[Int] { def open(partitionId: Long, version: Long): Boolean = { // prepare some resources for a partition // check `version` if possible and return `false` if this is a duplicated data to skip the data processing. } override def process(value: Int): Unit = { // process data } def close(errorOrNull: Throwable): Unit = { // release resources for a partition // check `errorOrNull` and handle the error if necessary. } }) ``` ## How was this patch tested? New unit tests. Author: Shixiong Zhu Closes #13342 from zsxwing/foreach. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00c31013 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00c31013 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00c31013 Branch: refs/heads/master Commit: 00c310133df4f3893dd90d801168c2ab9841b102 Parents: 5a3533e Author: Shixiong Zhu Authored: Fri Jun 10 00:11:46 2016 -0700 Committer: Tathagata Das Committed: Fri Jun 10 00:11:46 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 150 ++- .../scala/org/apache/spark/sql/Dataset.scala| 2 +- .../org/apache/spark/sql/ForeachWriter.scala| 105 + .../sql/execution/streaming/ForeachSink.scala | 53 +++ .../execution/streaming/ForeachSinkSuite.scala | 141 + .../spark/sql/sources/BucketedReadSuite.scala | 4 +- 6 files changed, 413 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/00c31013/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1dd8818..32e2fdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Utils @@ -40,7 +40,9 @@ import org.apache.spark.util.Utils * * @since 1.4.0 */ -final class DataFrameWriter private[sql](df: DataFrame) { +final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() /** * Specifies the behavior when data or table already exists. Options include: @@ -51,7 +53,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * @since 1.4.0 */ - def mode(saveMode: SaveMode): DataFrameWriter = { + def mode(saveMode: SaveMode): DataFrameWriter[T] = { // mode() is used for non-continuous queries // outputMode() is used for continuous queries assertNotStreaming("mode() can only be called on non-continuous queries") @@ -68,7 +70,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * @since 1.4.0 */ - def mode(saveMode: String): DataFrameWriter = { + def mode(saveMode: String): DataFrameWriter[T] = { // mode() is used for non-continuous queries // outputMode() is used for continuous queries assertNotStreaming("mode() can only be called on non-continuous queries") @@ -93,7 +95,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 2.0.0 */ @Experimental - def outputMode(outputMode: OutputMode): DataFrameWriter = { + def outputMode(outputMode: OutputMode): DataFrameWriter[T] = { assertStreaming("outputMode() can only be called on continuous queries")