spark git commit: [SPARK-8532] [SQL] In Python's DataFrameWriter, save/saveAsTable/json/parquet/jdbc always override mode
Repository: spark Updated Branches: refs/heads/branch-1.4 507381d39 -> 994abbaeb [SPARK-8532] [SQL] In Python's DataFrameWriter, save/saveAsTable/json/parquet/jdbc always override mode https://issues.apache.org/jira/browse/SPARK-8532 This PR has two changes. First, it fixes the bug that save actions (i.e. `save/saveAsTable/json/parquet/jdbc`) always override mode. Second, it adds input argument `partitionBy` to `save/saveAsTable/parquet`. Author: Yin Huai Closes #6937 from yhuai/SPARK-8532 and squashes the following commits: f972d5d [Yin Huai] davies's comment. d37abd2 [Yin Huai] style. d21290a [Yin Huai] Python doc. 889eb25 [Yin Huai] Minor refactoring and add partitionBy to save, saveAsTable, and parquet. 7fbc24b [Yin Huai] Use None instead of "error" as the default value of mode since JVM-side already uses "error" as the default value. d696dff [Yin Huai] Python style. 88eb6c4 [Yin Huai] If mode is "error", do not call mode method. c40c461 [Yin Huai] Regression test. (cherry picked from commit 5ab9fcfb01a0ad2f6c103f67c1a785d3b49e33f0) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/994abbae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/994abbae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/994abbae Branch: refs/heads/branch-1.4 Commit: 994abbaeb3c5444d09548291f865373ba4f1909f Parents: 507381d Author: Yin Huai Authored: Mon Jun 22 13:51:23 2015 -0700 Committer: Yin Huai Committed: Mon Jun 22 13:51:34 2015 -0700 -- python/pyspark/sql/readwriter.py | 30 +++--- python/pyspark/sql/tests.py | 32 2 files changed, 51 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/994abbae/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f036644..1b7bc0f 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -218,7 +218,10 @@ class DataFrameWriter(object): >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ -self._jwrite = self._jwrite.mode(saveMode) +# At the JVM side, the default value of mode is already set to "error". +# So, if the given saveMode is None, we will not call JVM-side's mode method. +if saveMode is not None: +self._jwrite = self._jwrite.mode(saveMode) return self @since(1.4) @@ -253,11 +256,12 @@ class DataFrameWriter(object): """ if len(cols) == 1 and isinstance(cols[0], (list, tuple)): cols = cols[0] -self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) +if len(cols) > 0: +self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) return self @since(1.4) -def save(self, path=None, format=None, mode="error", **options): +def save(self, path=None, format=None, mode=None, partitionBy=(), **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -272,11 +276,12 @@ class DataFrameWriter(object): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. +:param partitionBy: names of partitioning columns :param options: all other string options >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ -self.mode(mode).options(**options) +self.partitionBy(partitionBy).mode(mode).options(**options) if format is not None: self.format(format) if path is None: @@ -296,7 +301,7 @@ class DataFrameWriter(object): self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName) @since(1.4) -def saveAsTable(self, name, format=None, mode="error", **options): +def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options): """Saves the content of the :class:`DataFrame` as the specified table. In the case the table already exists, behavior of this function depends on the @@ -312,15 +317,16 @@ class DataFrameWriter(object): :param name: the table name :param format: the format used to save :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) +:param partitionBy: names of partitioning columns :param options: all other string o
spark git commit: [SPARK-8532] [SQL] In Python's DataFrameWriter, save/saveAsTable/json/parquet/jdbc always override mode
Repository: spark Updated Branches: refs/heads/master da7bbb943 -> 5ab9fcfb0 [SPARK-8532] [SQL] In Python's DataFrameWriter, save/saveAsTable/json/parquet/jdbc always override mode https://issues.apache.org/jira/browse/SPARK-8532 This PR has two changes. First, it fixes the bug that save actions (i.e. `save/saveAsTable/json/parquet/jdbc`) always override mode. Second, it adds input argument `partitionBy` to `save/saveAsTable/parquet`. Author: Yin Huai Closes #6937 from yhuai/SPARK-8532 and squashes the following commits: f972d5d [Yin Huai] davies's comment. d37abd2 [Yin Huai] style. d21290a [Yin Huai] Python doc. 889eb25 [Yin Huai] Minor refactoring and add partitionBy to save, saveAsTable, and parquet. 7fbc24b [Yin Huai] Use None instead of "error" as the default value of mode since JVM-side already uses "error" as the default value. d696dff [Yin Huai] Python style. 88eb6c4 [Yin Huai] If mode is "error", do not call mode method. c40c461 [Yin Huai] Regression test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ab9fcfb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ab9fcfb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ab9fcfb Branch: refs/heads/master Commit: 5ab9fcfb01a0ad2f6c103f67c1a785d3b49e33f0 Parents: da7bbb9 Author: Yin Huai Authored: Mon Jun 22 13:51:23 2015 -0700 Committer: Yin Huai Committed: Mon Jun 22 13:51:23 2015 -0700 -- python/pyspark/sql/readwriter.py | 30 +++--- python/pyspark/sql/tests.py | 32 2 files changed, 51 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ab9fcfb/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f036644..1b7bc0f 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -218,7 +218,10 @@ class DataFrameWriter(object): >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ -self._jwrite = self._jwrite.mode(saveMode) +# At the JVM side, the default value of mode is already set to "error". +# So, if the given saveMode is None, we will not call JVM-side's mode method. +if saveMode is not None: +self._jwrite = self._jwrite.mode(saveMode) return self @since(1.4) @@ -253,11 +256,12 @@ class DataFrameWriter(object): """ if len(cols) == 1 and isinstance(cols[0], (list, tuple)): cols = cols[0] -self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) +if len(cols) > 0: +self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) return self @since(1.4) -def save(self, path=None, format=None, mode="error", **options): +def save(self, path=None, format=None, mode=None, partitionBy=(), **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -272,11 +276,12 @@ class DataFrameWriter(object): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. +:param partitionBy: names of partitioning columns :param options: all other string options >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ -self.mode(mode).options(**options) +self.partitionBy(partitionBy).mode(mode).options(**options) if format is not None: self.format(format) if path is None: @@ -296,7 +301,7 @@ class DataFrameWriter(object): self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName) @since(1.4) -def saveAsTable(self, name, format=None, mode="error", **options): +def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options): """Saves the content of the :class:`DataFrame` as the specified table. In the case the table already exists, behavior of this function depends on the @@ -312,15 +317,16 @@ class DataFrameWriter(object): :param name: the table name :param format: the format used to save :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) +:param partitionBy: names of partitioning columns :param options: all other string options """ -self.mode(mode).options(**options) +self.partitionBy(partitionBy).