Repository: spark Updated Branches: refs/heads/master 68a6dc974 -> 0917c8ee0
[SPARK-18888] partitionBy in DataStreamWriter in Python throws _to_seq not defined ## What changes were proposed in this pull request? `_to_seq` wasn't imported. ## How was this patch tested? Added partitionBy to existing write path unit test Author: Burak Yavuz <brk...@gmail.com> Closes #16297 from brkyvz/SPARK-18888. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0917c8ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0917c8ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0917c8ee Branch: refs/heads/master Commit: 0917c8ee07bd3de87d9754960d8e89808b5efb2f Parents: 68a6dc9 Author: Burak Yavuz <brk...@gmail.com> Authored: Thu Dec 15 14:26:54 2016 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Dec 15 14:26:54 2016 -0800 ---------------------------------------------------------------------- python/pyspark/sql/streaming.py | 1 + python/pyspark/sql/tests.py | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0917c8ee/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index eabd5ef..5014299 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -28,6 +28,7 @@ from abc import ABCMeta, abstractmethod from pyspark import since, keyword_only from pyspark.rdd import ignore_unicode_prefix +from pyspark.sql.column import _to_seq from pyspark.sql.readwriter import OptionUtils, to_str from pyspark.sql.types import * from pyspark.sql.utils import StreamingQueryException http://git-wip-us.apache.org/repos/asf/spark/blob/0917c8ee/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6ddd804..18fd68e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -50,7 +50,7 @@ from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests -from pyspark.sql.functions import UserDefinedFunction, sha2 +from pyspark.sql.functions import UserDefinedFunction, sha2, lit from pyspark.sql.window import Window from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException @@ -1065,7 +1065,8 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(df.schema.simpleString(), "struct<data:string>") def test_stream_save_options(self): - df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') \ + .withColumn('id', lit(1)) for q in self.spark._wrapped.streams.active: q.stop() tmpPath = tempfile.mkdtemp() @@ -1074,7 +1075,7 @@ class SQLTests(ReusedPySparkTestCase): out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ - .format('parquet').outputMode('append').option('path', out).start() + .format('parquet').partitionBy('id').outputMode('append').option('path', out).start() try: self.assertEqual(q.name, 'this_query') self.assertTrue(q.isActive) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org