[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17077


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115153524
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,63 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, col, *cols):
+"""Buckets the output by the given columns.If specified,
--- End diff --

Nit: `columns.If` -> `columns. If`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115153302
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,63 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, col, *cols):
+"""Buckets the output by the given columns.If specified,
+the output is laid out on the file system similar to Hive's 
bucketing scheme.
+
+:param numBuckets: the number of buckets to save
+:param col: a name of a column, or a list of names.
+:param cols: additional names (optional). If `col` is a list it 
should be empty.
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
--- End diff --

uh. Yes. Bucket info is not part of the file/directory names, unlike 
partitioning info. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115151299
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,63 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, col, *cols):
+"""Buckets the output by the given columns.If specified,
+the output is laid out on the file system similar to Hive's 
bucketing scheme.
+
+:param numBuckets: the number of buckets to save
+:param col: a name of a column, or a list of names.
+:param cols: additional names (optional). If `col` is a list it 
should be empty.
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
--- End diff --

@gatorsmile Can we?

```
➜  spark git:(master) git rev-parse HEAD   
2cf83c47838115f71419ba5b9296c69ec1d746cd
➜  spark git:(master) bin/spark-shell 
Spark context Web UI available at http://192.168.1.101:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1494184109262).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
  /_/
 
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> Seq(("a", 1, 3)).toDF("x", "y", "z").write.bucketBy(3, "x", 
"y").format("parquet").save("/tmp/foo")
org.apache.spark.sql.AnalysisException: 'save' does not support bucketing 
right now;
  at 
org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:305)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:231)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
  ... 48 elided
```

`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115149598
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,63 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, col, *cols):
+"""Buckets the output by the given columns.If specified,
+the output is laid out on the file system similar to Hive's 
bucketing scheme.
+
+:param numBuckets: the number of buckets to save
+:param col: a name of a column, or a list of names.
+:param cols: additional names (optional). If `col` is a list it 
should be empty.
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
--- End diff --

This is not accurate. We also can use `save` to store the bucked tables 
without saving its metadata in metastore. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115138060
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+cols = cols[0]
+
+if not isinstance(numBuckets, int):
+raise TypeError("numBuckets should be an int, got 
{0}.".format(type(numBuckets)))
+
+if not all(isinstance(c, basestring) for c in cols):
+raise TypeError("cols argument should be a string or a 
sequence of strings.")
--- End diff --

Or we just replace error message with:

```
"cols argument should be a string, List[str] or Tuple[str, ...]"
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115138021
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
--- End diff --

Why do you say that? `cols` are  variadic, so it should be always `Sized`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115136451
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
--- End diff --

If `len(cols) == 0`, users could hit strange errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115134650
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
--- End diff --

Thank you for adding the wrapper. 

Yes. We should make the Python APIs consistent with Scala APIs, if 
possible. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-06 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115133626
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+cols = cols[0]
+
+if not isinstance(numBuckets, int):
+raise TypeError("numBuckets should be an int, got 
{0}.".format(type(numBuckets)))
+
+if not all(isinstance(c, basestring) for c in cols):
+raise TypeError("cols argument should be a string or a 
sequence of strings.")
--- End diff --

Good point. We can support arbitrary `Iterable[str]` though. 

```python
if len(cols) == 1 and isinstance(cols[0], collections.abc.Iterable):
cols = list(cols[0])
```

Caveat is, we don't allow this anywhere else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-06 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115129876
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+cols = cols[0]
+
+if not isinstance(numBuckets, int):
+raise TypeError("numBuckets should be an int, got 
{0}.".format(type(numBuckets)))
+
+if not all(isinstance(c, basestring) for c in cols):
+raise TypeError("cols argument should be a string or a 
sequence of strings.")
--- End diff --

So I don't think we really support all sequences (the above typecheck on 
L581 requires list or tuple but there are additional types of sequences).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-06 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115129682
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
--- End diff --

I'd copy the full description from DataFrameWriter here since comparing it 
to Hive could help people new to Spark understand what bucketBy does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-05-06 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r115129884
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -563,6 +563,60 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.3)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+.. note:: Applicable for file-based data sources in combination 
with
+  :py:meth:`DataFrameWriter.saveAsTable`.
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+cols = cols[0]
+
+if not isinstance(numBuckets, int):
+raise TypeError("numBuckets should be an int, got 
{0}.".format(type(numBuckets)))
+
+if not all(isinstance(c, basestring) for c in cols):
+raise TypeError("cols argument should be a string or a 
sequence of strings.")
+
+col = cols[0]
+cols = cols[1:]
+
+self._jwrite = self._jwrite.bucketBy(numBuckets, col, 
_to_seq(self._spark._sc, cols))
+return self
+
+@since(2.3)
+def sortBy(self, *cols):
+"""Sorts the output in each bucket by the given columns on the 
file system.
+
+:param cols: name of columns
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .sortBy('day')
+... .mode("overwrite")
+... .saveAsTable('sorted_bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+cols = cols[0]
+
+if not all(isinstance(c, basestring) for c in cols):
+raise TypeError("cols argument should be a string or a 
sequence of strings.")
--- End diff --

same note as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-12 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r111221795
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,56 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+def count_bucketed_cols(names, table="pyspark_bucket"):
+"""Given a sequence of column names and a table name
+query the catalog and return number o columns which are
+used for bucketing
+"""
+cols = self.spark.catalog.listColumns(table)
+num = len([c for c in cols if c.name in names and c.isBucket])
+return num
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x"]), 1)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort
+df.write.bucketBy(2, 
"x").sortBy("z").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x"]), 1)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with a list of columns
+df.write.bucketBy(3, ["x", 
"y"]).mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with a list of columns
+(df.write.bucketBy(2, "x")
+.sortBy(["y", "z"])
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with multiple columns
+(df.write.bucketBy(2, "x")
+.sortBy("y", "z")
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket")
--- End diff --

@holdenk Do you suggest adding `tearDown`? I thought about it but right now 
tests are so inflated (sadly not much support for 
[SPARK-19224](https://issues.apache.org/jira/browse/SPARK-19224)) it will be 
completely detached from the context.

From the other hand adding artificial `try ... finally` seems wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-12 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r111214507
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,56 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+def count_bucketed_cols(names, table="pyspark_bucket"):
+"""Given a sequence of column names and a table name
+query the catalog and return number o columns which are
+used for bucketing
+"""
+cols = self.spark.catalog.listColumns(table)
+num = len([c for c in cols if c.name in names and c.isBucket])
+return num
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x"]), 1)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort
+df.write.bucketBy(2, 
"x").sortBy("z").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x"]), 1)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with a list of columns
+df.write.bucketBy(3, ["x", 
"y"]).mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with a list of columns
+(df.write.bucketBy(2, "x")
+.sortBy(["y", "z"])
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with multiple columns
+(df.write.bucketBy(2, "x")
+.sortBy("y", "z")
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket")
--- End diff --

If we're going to drop the table here we should probably put it in a final 
block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110794303
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
--- End diff --

Thank you for taking my opinion into account. Yea, we should remove or 
change the version. I meant to follow the rest of contents.

Generally, the contents in documentation has been matched among APIs in 
different languages up to my knowledge. I don't think this is a kind of a must 
but I think it is safer to avoid getting blamed for any reason in the future 
and confusion for the users.

I have seen several minor PRs fixing documentations (e.g., typos) that has 
to identically be fixed for other APIs in different language and I also made 
some PRs to match the documentations, e.g., 
https://github.com/apache/spark/pull/17429


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110792594
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,56 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+def count_bucketed_cols(names, table="pyspark_bucket"):
+"""Given a sequence of column names and a table name
+query the catalog and return number o columns which are
+used for bucketing
+"""
+cols = self.spark.catalog.listColumns(table)
+num = len([c for c in cols if c.name in names and c.isBucket])
+return num
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x"]), 1)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort
+df.write.bucketBy(2, 
"x").sortBy("z").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x"]), 1)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with a list of columns
+df.write.bucketBy(3, ["x", 
"y"]).mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with a list of columns
+(df.write.bucketBy(2, "x")
+.sortBy(["y", "z"])
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with multiple columns
+(df.write.bucketBy(2, "x")
+.sortBy("y", "z")
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket")
--- End diff --

Yea, I think this is a correct way to drop the table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110699779
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
--- End diff --

Copying docs from Scala docs directly could be confusing since we won't 
support this in 2.0 and 2.1 and changes since 2.0 doesn't really affect us 
here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110693499
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
--- End diff --

If you think it is better I'll trust your judgment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110692936
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
+2
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort
+df.write.bucketBy(2, 
"x").sortBy("z").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with a list of columns
+df.write.bucketBy(3, ["x", 
"y"]).mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
+2
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with a list of columns
+(df.write.bucketBy(2, "x")
+.sortBy(["y", "z"])
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with multiple columns
+(df.write.bucketBy(2, "x")
+.sortBy("y", "z")
+.mode("overwrite").saveAsTable("pyspark_bucket"))
--- End diff --

I don't think that dropping before is necessary. We override on each write 
and name clashes are unlikely.

We can drop down after the tests but I am not sure how to do it right. 
`SQLTests` is overgrown and I am not sure if we should add `tearDown`  only for 
this but adding `DROP TABLE` in test itself doesn't look right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110634385
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
+2
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort
+df.write.bucketBy(2, 
"x").sortBy("z").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with a list of columns
+df.write.bucketBy(3, ["x", 
"y"]).mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
+2
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with a list of columns
+(df.write.bucketBy(2, "x")
+.sortBy(["y", "z"])
+.mode("overwrite").saveAsTable("pyspark_bucket"))
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write with bucket and sort with multiple columns
+(df.write.bucketBy(2, "x")
+.sortBy("y", "z")
+.mode("overwrite").saveAsTable("pyspark_bucket"))
--- End diff --

@zero323, should we drop the table before or after this test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110632132
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2167,6 +2167,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
+1
+)
+self.assertSetEqual(set(data), 
set(self.spark.table("pyspark_bucket").collect()))
+
+# Test write two bucketing columns
+df.write.bucketBy(3, "x", 
"y").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name in ("x", "y") and c.isBucket]),
--- End diff --

@zero323, I am sorry. What do you think about something like this one 
below?:

```python
cols = self.spark.catalog.listColumns("pyspark_bucket")
num = len([c for c in cols if c.name in ("x", "y") and c.isBucket])
self.assertEqual(num, 2)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110630404
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -545,6 +545,57 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.2)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
--- End diff --

I think just copying it from Scala doc is good enough to prevent overhead 
of sweeping the documentation when we start to support other operations later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110626138
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2038,6 +2038,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
--- End diff --

Thanks for taking a look for the related ones and trying it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110624985
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2038,6 +2038,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
--- End diff --

Yup, I don't argue with my personal preference. I am fine with it. I dont 
strongly feel about both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-10 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110621829
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2038,6 +2038,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
--- End diff --

We can simplify this to 
  
catalog =  self.spark.catalog

sum(c.name == "x" and c.isBucket for c in 
catalog.listColumns("pyspark_bucket"))

f you think this is more readable but i am not convinced that it makes 
sense to use a separate variable here. We have a few tests like this, don't 
care about the sequence itself, and I think it would only pollute the scope. 
But if you have strong feelings about I am happy to adjust it.

Regarding the comment style... Right now (excluding `bucket` by and 
`sortBy`) we have 

- 23 docstrings with:

"""
"""

- 7 docstrings:





in `readwriter`.  As you said both are valid, but if we want to keep only 
one convention it would be a good idea to adjust a whole module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110544338
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2038,6 +2038,61 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket")
+ if c.name == "x" and c.isBucket]),
--- End diff --

BTW, maybe, we should break this into multiple lines. It seems not readable 
if more commits should be pushed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110542557
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -545,6 +545,57 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.2)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
--- End diff --

Both

```
"""
...
"""
```

or

```
"""...
"""
```

comply pep8 for multiple-line docstring up to my knowledge although I don't 
think a specific way has been preferred in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-09 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110542103
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -545,6 +545,57 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.2)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
--- End diff --

Regarding style I had  a similar exchange with @jkbradley lately 
(https://github.com/apache/spark/pull/17218#pullrequestreview-29059063).  If a 
single convention is desired a believe it should be documented and the 
remaining docstrings should be adjusted. Personally I am indifferent thought 
PEP 8 and PEP 257 seem to prefer this convention over placing opening quotes in 
a separate line.

>  you might just want to copy that.

Do you mean 
[this](https://github.com/apache/spark/blob/364b0db75308ddd346b4ab1e032680e8eb4c1753/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L183-L186)?
 I wonder if should rather document that it is allowed only with `saveAsTable`. 
What do you think?

 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-09 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110538670
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -545,6 +545,57 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.2)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
+
+:param numBuckets: the number of buckets to save
+:param cols: name of columns
+
+>>> (df.write.format('parquet')
+... .bucketBy(100, 'year', 'month')
+... .mode("overwrite")
+... .saveAsTable('bucketed_table'))
+"""
+if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+cols = cols[0]
+
+if not isinstance(numBuckets, int):
+raise TypeError("numBuckets should be an int, got 
{0}.".format(type(numBuckets)))
+
+if not all(isinstance(c, basestring) for c in cols):
+raise TypeError("cols argument should be a string or a 
sequence of strings.")
+
+col = cols[0]
+cols = cols[1:]
+
+self._jwrite = self._jwrite.bucketBy(numBuckets, col, 
_to_seq(self._spark._sc, cols))
+return self
+
+@since(2.2)
+def sortBy(self, *cols):
+"""Sorts the output in each bucket by the given columns on the 
file system.
--- End diff --

Same comment as above with regards to the docstring


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-04-09 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r110538647
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -545,6 +545,57 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.2)
+def bucketBy(self, numBuckets, *cols):
+"""Buckets the output by the given columns on the file system.
--- End diff --

So the `bucketBy` description in the scaladoc is a bit more in depth, you 
might just want to copy that.

Also generally our style for multi-line doc string is to have the open 
`"""` on its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-02-26 Thread zero323
Github user zero323 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r103139360
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2038,6 +2038,53 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket") if c.name == "x" and 
c.isBucket]),
--- End diff --

Indeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-02-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r103138278
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2038,6 +2038,53 @@ def test_BinaryType_serialization(self):
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 
+def test_bucketed_write(self):
+data = [
+(1, "foo", 3.0), (2, "foo", 5.0),
+(3, "bar", -1.0), (4, "bar", 6.0),
+]
+df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+# Test write with one bucketing column
+df.write.bucketBy(3, 
"x").mode("overwrite").saveAsTable("pyspark_bucket")
+self.assertEqual(
+len([c for c in 
self.spark.catalog.listColumns("pyspark_bucket") if c.name == "x" and 
c.isBucket]),
--- End diff --

Oh, BTW, I assume it exceeds 100 length limit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-02-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17077#discussion_r103138165
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -545,6 +545,55 @@ def partitionBy(self, *cols):
 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, 
cols))
 return self
 
+@since(2.1)
--- End diff --

Maybe it should be 2.2 :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for...

2017-02-26 Thread zero323
GitHub user zero323 opened a pull request:

https://github.com/apache/spark/pull/17077

[SPARK-16931][PYTHON][SQL] Add Python wrapper for bucketBy 

## What changes were proposed in this pull request?

Adds Python wrappers for `DataFrameWriter.bucketBy` and 
`DataFrameWriter.sortBy` 
([SPARK-16931](https://issues.apache.org/jira/browse/SPARK-16931))

## How was this patch tested?

Unit tests covering new feature.

__Note__: Based on work of @GregBowyer 
(f49b9a23468f7af32cb53d2b654272757c151725)

CC @HyukjinKwon 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zero323/spark SPARK-16931

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17077.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17077


commit 3024d1cca60793f9aca2cf144bc630106374131c
Author: Greg Bowyer 
Date:   2016-08-06T00:53:30Z

[SPARK-16931][PYTHON] PySpark APIS for bucketBy and sortBy

commit 7d911c647f21ada7fb429fd7c1c5f15934ff8847
Author: zero323 
Date:   2017-02-27T02:59:22Z

Add tests for bucketed writes

commit 72c04a3f196da5223ebb44725aa88cffa81036e4
Author: zero323 
Date:   2017-02-27T02:59:52Z

Check input types in sortBy / bucketBy




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org