spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`
Repository: spark Updated Branches: refs/heads/branch-2.0 c1390ccbb -> f15d641e2 [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter` ## What changes were proposed in this pull request? It doesn't make sense to specify partitioning parameters, when we write data out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s. This patch adds `assertNotPartitioned` check in `DataFrameWriter`. operation should check not partitioned? mode outputMode trigger format option/options partitionBy bucketBy sortBy save queryName startStream foreach yes insertInto saveAsTable jdbc yes json parquet orc text csv ## How was this patch tested? New dedicated tests. Author: Liwei Lin Closes #13597 from lw-lin/add-assertNotPartitioned. (cherry picked from commit fb219029dd1b8d2783c3e202361401048296595c) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15d641e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15d641e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15d641e Branch: refs/heads/branch-2.0 Commit: f15d641e297d425a8c1b4ba6c93f4f98a3f70d0f Parents: c1390cc Author: Liwei Lin Authored: Fri Jun 10 13:01:29 2016 -0700 Committer: Shixiong Zhu Committed: Fri Jun 10 13:01:37 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +- .../test/DataFrameReaderWriterSuite.scala | 42 ++-- .../spark/sql/sources/BucketedWriteSuite.scala | 8 ++-- 3 files changed, 52 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6ce59e8..78b74f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def foreach(writer: ForeachWriter[T]): ContinuousQuery = { +assertNotPartitioned("foreach") assertNotBucketed("foreach") assertStreaming( "foreach() can only be called on streaming Datasets/DataFrames.") @@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { - throw new IllegalArgumentException( -s"'$operation' does not support bucketing right now.") + throw new AnalysisException(s"'$operation' does not support bucketing right now") +} + } + + private def assertNotPartitioned(operation: String): Unit = { +if (partitioningColumns.isDefined) { + throw new AnalysisException( s"'$operation' does not support partitioning") } } @@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +assertNotPartitioned("jdbc") +assertNotBucketed("jdbc") assertNotStreaming("jdbc() can only be called on non-continuous queries") val props = new Properties() http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index bf6063a..6e0d66a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write -val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream()) -assert(e.getMessage == "'startStream' does not support
spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`
Repository: spark Updated Branches: refs/heads/master 5c16ad0d5 -> fb219029d [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter` ## What changes were proposed in this pull request? It doesn't make sense to specify partitioning parameters, when we write data out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s. This patch adds `assertNotPartitioned` check in `DataFrameWriter`. operation should check not partitioned? mode outputMode trigger format option/options partitionBy bucketBy sortBy save queryName startStream foreach yes insertInto saveAsTable jdbc yes json parquet orc text csv ## How was this patch tested? New dedicated tests. Author: Liwei Lin Closes #13597 from lw-lin/add-assertNotPartitioned. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb219029 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb219029 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb219029 Branch: refs/heads/master Commit: fb219029dd1b8d2783c3e202361401048296595c Parents: 5c16ad0 Author: Liwei Lin Authored: Fri Jun 10 13:01:29 2016 -0700 Committer: Shixiong Zhu Committed: Fri Jun 10 13:01:29 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +- .../test/DataFrameReaderWriterSuite.scala | 42 ++-- .../spark/sql/sources/BucketedWriteSuite.scala | 8 ++-- 3 files changed, 52 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6ce59e8..78b74f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def foreach(writer: ForeachWriter[T]): ContinuousQuery = { +assertNotPartitioned("foreach") assertNotBucketed("foreach") assertStreaming( "foreach() can only be called on streaming Datasets/DataFrames.") @@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { - throw new IllegalArgumentException( -s"'$operation' does not support bucketing right now.") + throw new AnalysisException(s"'$operation' does not support bucketing right now") +} + } + + private def assertNotPartitioned(operation: String): Unit = { +if (partitioningColumns.isDefined) { + throw new AnalysisException( s"'$operation' does not support partitioning") } } @@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +assertNotPartitioned("jdbc") +assertNotBucketed("jdbc") assertNotStreaming("jdbc() can only be called on non-continuous queries") val props = new Properties() http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index bf6063a..6e0d66a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write -val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream()) -assert(e.getMessage == "'startStream' does not support bucketing right now.") +val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream()) +