spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

2016-06-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c1390ccbb -> f15d641e2


[SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

## What changes were proposed in this pull request?

It doesn't make sense to specify partitioning parameters, when we write data 
out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s.

This patch adds `assertNotPartitioned` check in `DataFrameWriter`.



operation
should check not partitioned?


mode



outputMode



trigger



format



option/options



partitionBy



bucketBy



sortBy



save



queryName



startStream



foreach
yes


insertInto



saveAsTable



jdbc
yes


json



parquet



orc



text



csv




## How was this patch tested?

New dedicated tests.

Author: Liwei Lin 

Closes #13597 from lw-lin/add-assertNotPartitioned.

(cherry picked from commit fb219029dd1b8d2783c3e202361401048296595c)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15d641e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15d641e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15d641e

Branch: refs/heads/branch-2.0
Commit: f15d641e297d425a8c1b4ba6c93f4f98a3f70d0f
Parents: c1390cc
Author: Liwei Lin 
Authored: Fri Jun 10 13:01:29 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Jun 10 13:01:37 2016 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 12 +-
 .../test/DataFrameReaderWriterSuite.scala   | 42 ++--
 .../spark/sql/sources/BucketedWriteSuite.scala  |  8 ++--
 3 files changed, 52 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6ce59e8..78b74f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
*/
   @Experimental
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotPartitioned("foreach")
 assertNotBucketed("foreach")
 assertStreaming(
   "foreach() can only be called on streaming Datasets/DataFrames.")
@@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private def assertNotBucketed(operation: String): Unit = {
 if (numBuckets.isDefined || sortColumnNames.isDefined) {
-  throw new IllegalArgumentException(
-s"'$operation' does not support bucketing right now.")
+  throw new AnalysisException(s"'$operation' does not support bucketing 
right now")
+}
+  }
+
+  private def assertNotPartitioned(operation: String): Unit = {
+if (partitioningColumns.isDefined) {
+  throw new AnalysisException( s"'$operation' does not support 
partitioning")
 }
   }
 
@@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
+assertNotPartitioned("jdbc")
+assertNotBucketed("jdbc")
 assertNotStreaming("jdbc() can only be called on non-continuous queries")
 
 val props = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index bf6063a..6e0d66a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .format("org.apache.spark.sql.streaming.test")
   .stream()
 val w = df.write
-val e = 

spark git commit: [SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

2016-06-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5c16ad0d5 -> fb219029d


[SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

## What changes were proposed in this pull request?

It doesn't make sense to specify partitioning parameters, when we write data 
out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s.

This patch adds `assertNotPartitioned` check in `DataFrameWriter`.



operation
should check not partitioned?


mode



outputMode



trigger



format



option/options



partitionBy



bucketBy



sortBy



save



queryName



startStream



foreach
yes


insertInto



saveAsTable



jdbc
yes


json



parquet



orc



text



csv




## How was this patch tested?

New dedicated tests.

Author: Liwei Lin 

Closes #13597 from lw-lin/add-assertNotPartitioned.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb219029
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb219029
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb219029

Branch: refs/heads/master
Commit: fb219029dd1b8d2783c3e202361401048296595c
Parents: 5c16ad0
Author: Liwei Lin 
Authored: Fri Jun 10 13:01:29 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Jun 10 13:01:29 2016 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 12 +-
 .../test/DataFrameReaderWriterSuite.scala   | 42 ++--
 .../spark/sql/sources/BucketedWriteSuite.scala  |  8 ++--
 3 files changed, 52 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6ce59e8..78b74f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
*/
   @Experimental
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotPartitioned("foreach")
 assertNotBucketed("foreach")
 assertStreaming(
   "foreach() can only be called on streaming Datasets/DataFrames.")
@@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private def assertNotBucketed(operation: String): Unit = {
 if (numBuckets.isDefined || sortColumnNames.isDefined) {
-  throw new IllegalArgumentException(
-s"'$operation' does not support bucketing right now.")
+  throw new AnalysisException(s"'$operation' does not support bucketing 
right now")
+}
+  }
+
+  private def assertNotPartitioned(operation: String): Unit = {
+if (partitioningColumns.isDefined) {
+  throw new AnalysisException( s"'$operation' does not support 
partitioning")
 }
   }
 
@@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
+assertNotPartitioned("jdbc")
+assertNotBucketed("jdbc")
 assertNotStreaming("jdbc() can only be called on non-continuous queries")
 
 val props = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/fb219029/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index bf6063a..6e0d66a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   .format("org.apache.spark.sql.streaming.test")
   .stream()
 val w = df.write
-val e = intercept[IllegalArgumentException](w.bucketBy(1, 
"text").startStream())
-assert(e.getMessage == "'startStream' does not support bucketing right 
now.")
+val e =