Repository: spark Updated Branches: refs/heads/branch-2.0 c1390ccbb -> f15d641e2
[SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter` ## What changes were proposed in this pull request? It doesn't make sense to specify partitioning parameters, when we write data out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s. This patch adds `assertNotPartitioned` check in `DataFrameWriter`. <table> <tr> <td align="center"><strong>operation</strong></td> <td align="center"><strong>should check not partitioned?</strong></td> </tr> <tr> <td align="center">mode</td> <td align="center"></td> </tr> <tr> <td align="center">outputMode</td> <td align="center"></td> </tr> <tr> <td align="center">trigger</td> <td align="center"></td> </tr> <tr> <td align="center">format</td> <td align="center"></td> </tr> <tr> <td align="center">option/options</td> <td align="center"></td> </tr> <tr> <td align="center">partitionBy</td> <td align="center"></td> </tr> <tr> <td align="center">bucketBy</td> <td align="center"></td> </tr> <tr> <td align="center">sortBy</td> <td align="center"></td> </tr> <tr> <td align="center">save</td> <td align="center"></td> </tr> <tr> <td align="center">queryName</td> <td align="center"></td> </tr> <tr> <td align="center">startStream</td> <td align="center"></td> </tr> <tr> <td align="center">foreach</td> <td align="center">yes</td> </tr> <tr> <td align="center">insertInto</td> <td align="center"></td> </tr> <tr> <td align="center">saveAsTable</td> <td align="center"></td> </tr> <tr> <td align="center">jdbc</td> <td align="center">yes</td> </tr> <tr> <td align="center">json</td> <td align="center"></td> </tr> <tr> <td align="center">parquet</td> <td align="center"></td> </tr> <tr> <td align="center">orc</td> <td align="center"></td> </tr> <tr> <td align="center">text</td> <td align="center"></td> </tr> <tr> <td align="center">csv</td> <td align="center"></td> </tr> </table> ## How was this patch tested? New dedicated tests. Author: Liwei Lin <lwl...@gmail.com> Closes #13597 from lw-lin/add-assertNotPartitioned. (cherry picked from commit fb219029dd1b8d2783c3e202361401048296595c) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15d641e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15d641e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15d641e Branch: refs/heads/branch-2.0 Commit: f15d641e297d425a8c1b4ba6c93f4f98a3f70d0f Parents: c1390cc Author: Liwei Lin <lwl...@gmail.com> Authored: Fri Jun 10 13:01:29 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Fri Jun 10 13:01:37 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +++++- .../test/DataFrameReaderWriterSuite.scala | 42 ++++++++++++++++++-- .../spark/sql/sources/BucketedWriteSuite.scala | 8 ++-- 3 files changed, 52 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6ce59e8..78b74f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def foreach(writer: ForeachWriter[T]): ContinuousQuery = { + assertNotPartitioned("foreach") assertNotBucketed("foreach") assertStreaming( "foreach() can only be called on streaming Datasets/DataFrames.") @@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { - throw new IllegalArgumentException( - s"'$operation' does not support bucketing right now.") + throw new AnalysisException(s"'$operation' does not support bucketing right now") + } + } + + private def assertNotPartitioned(operation: String): Unit = { + if (partitioningColumns.isDefined) { + throw new AnalysisException( s"'$operation' does not support partitioning") } } @@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { + assertNotPartitioned("jdbc") + assertNotBucketed("jdbc") assertNotStreaming("jdbc() can only be called on non-continuous queries") val props = new Properties() http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index bf6063a..6e0d66a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write - val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream()) - assert(e.getMessage == "'startStream' does not support bucketing right now.") + val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream()) + assert(e.getMessage == "'startStream' does not support bucketing right now;") } test("check sortBy() can only be called on non-continuous queries;") { @@ -464,8 +464,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write - val e = intercept[IllegalArgumentException](w.sortBy("text").startStream()) - assert(e.getMessage == "'startStream' does not support bucketing right now.") + val e = intercept[AnalysisException](w.sortBy("text").startStream()) + assert(e.getMessage == "'startStream' does not support bucketing right now;") } test("check save(path) can only be called on non-continuous queries") { @@ -558,6 +558,40 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { assert(e.getMessage == "csv() can only be called on non-continuous queries;") } + test("check foreach() does not support partitioning or bucketing") { + val df = spark.read + .format("org.apache.spark.sql.streaming.test") + .stream() + + var w = df.write.partitionBy("value") + var e = intercept[AnalysisException](w.foreach(null)) + Seq("foreach", "partitioning").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + + w = df.write.bucketBy(2, "value") + e = intercept[AnalysisException](w.foreach(null)) + Seq("foreach", "bucketing").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + test("check jdbc() does not support partitioning or bucketing") { + val df = spark.read.text(newTextInput) + + var w = df.write.partitionBy("value") + var e = intercept[AnalysisException](w.jdbc(null, null, null)) + Seq("jdbc", "partitioning").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + + w = df.write.bucketBy(2, "value") + e = intercept[AnalysisException](w.jdbc(null, null, null)) + Seq("jdbc", "bucketing").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + test("ConsoleSink can be correctly loaded") { LastOptions.clear() val df = spark.read http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 61a281d..9974451 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -62,19 +62,19 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle test("write bucketed data using save()") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - val e = intercept[IllegalArgumentException] { + val e = intercept[AnalysisException] { df.write.bucketBy(2, "i").parquet("/tmp/path") } - assert(e.getMessage == "'save' does not support bucketing right now.") + assert(e.getMessage == "'save' does not support bucketing right now;") } test("write bucketed data using insertInto()") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - val e = intercept[IllegalArgumentException] { + val e = intercept[AnalysisException] { df.write.bucketBy(2, "i").insertInto("tt") } - assert(e.getMessage == "'insertInto' does not support bucketing right now.") + assert(e.getMessage == "'insertInto' does not support bucketing right now;") } private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org