This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 26b8297 [SPARK-35806][PYTHON][FOLLOW-UP] Mapping the mode argument to pandas in DataFrame.to_csv 26b8297 is described below commit 26b8297fa395d4d221dcd2fc2cb379bc665242d3 Author: itholic <haejoon....@databricks.com> AuthorDate: Fri Jul 30 12:48:24 2021 +0900 [SPARK-35806][PYTHON][FOLLOW-UP] Mapping the mode argument to pandas in DataFrame.to_csv ### What changes were proposed in this pull request? This PR is follow-up for https://github.com/apache/spark/pull/33414 to support the more options for `mode` argument for all APIs that has `mode` argument, not only `DataFrame.to_csv`. ### Why are the changes needed? To keep the usage consistency for the arguments that have same name. ### Does this PR introduce _any_ user-facing change? More options is available for all APIs that has `mode` argument, same as `DataFrame.to_csv` ### How was this patch tested? Manually test on local Closes #33569 from itholic/SPARK-35085-followup. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 94cb2bbbc276910666e7d485f8ac1b918c79c3f7) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/pandas/frame.py | 61 ++++++++++++++++++++++++---------------- python/pyspark/pandas/generic.py | 19 +++++++------ 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index af8b5ad..de675f1f 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -103,6 +103,7 @@ from pyspark.pandas.utils import ( validate_axis, validate_bool_kwarg, validate_how, + validate_mode, verify_temp_column_name, ) from pyspark.pandas.generic import Frame @@ -4563,11 +4564,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})] self, name: str, format: Optional[str] = None, - mode: str = "overwrite", + mode: str = "w", partition_cols: Optional[Union[str, List[str]]] = None, index_col: Optional[Union[str, List[str]]] = None, **options: Any ) -> None: + mode = validate_mode(mode) return self.spark.to_table(name, format, mode, partition_cols, index_col, **options) to_table.__doc__ = SparkFrameMethods.to_table.__doc__ @@ -4575,7 +4577,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] def to_delta( self, path: str, - mode: str = "overwrite", + mode: str = "w", partition_cols: Optional[Union[str, List[str]]] = None, index_col: Optional[Union[str, List[str]]] = None, **options: "OptionalPrimitiveType" @@ -4587,14 +4589,16 @@ defaultdict(<class 'list'>, {'col..., 'col...})] ---------- path : str, required Path to write to. - mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default - 'overwrite'. Specifies the behavior of the save operation when the destination - exists already. + mode : str + Python write mode, default 'w'. - - 'append': Append the new data to existing data. - - 'overwrite': Overwrite existing data. - - 'ignore': Silently ignore this operation if data already exists. - - 'error' or 'errorifexists': Throw an exception if data already exists. + .. note:: mode can accept the strings for Spark writing mode. + Such as 'append', 'overwrite', 'ignore', 'error', 'errorifexists'. + + - 'append' (equivalent to 'a'): Append the new data to existing data. + - 'overwrite' (equivalent to 'w'): Overwrite existing data. + - 'ignore': Silently ignore this operation if data already exists. + - 'error' or 'errorifexists': Throw an exception if data already exists. partition_cols : str or list of str, optional, default None Names of partitioning columns @@ -4641,6 +4645,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1: options = options.get("options") # type: ignore + mode = validate_mode(mode) self.spark.to_spark_io( path=path, mode=mode, @@ -4653,7 +4658,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] def to_parquet( self, path: str, - mode: str = "overwrite", + mode: str = "w", partition_cols: Optional[Union[str, List[str]]] = None, compression: Optional[str] = None, index_col: Optional[Union[str, List[str]]] = None, @@ -4666,14 +4671,16 @@ defaultdict(<class 'list'>, {'col..., 'col...})] ---------- path : str, required Path to write to. - mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, - default 'overwrite'. Specifies the behavior of the save operation when the - destination exists already. + mode : str + Python write mode, default 'w'. - - 'append': Append the new data to existing data. - - 'overwrite': Overwrite existing data. - - 'ignore': Silently ignore this operation if data already exists. - - 'error' or 'errorifexists': Throw an exception if data already exists. + .. note:: mode can accept the strings for Spark writing mode. + Such as 'append', 'overwrite', 'ignore', 'error', 'errorifexists'. + + - 'append' (equivalent to 'a'): Append the new data to existing data. + - 'overwrite' (equivalent to 'w'): Overwrite existing data. + - 'ignore': Silently ignore this operation if data already exists. + - 'error' or 'errorifexists': Throw an exception if data already exists. partition_cols : str or list of str, optional, default None Names of partitioning columns @@ -4715,6 +4722,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1: options = options.get("options") # type: ignore + mode = validate_mode(mode) builder = self.to_spark(index_col=index_col).write.mode(mode) if partition_cols is not None: builder.partitionBy(partition_cols) @@ -4725,7 +4733,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] def to_orc( self, path: str, - mode: str = "overwrite", + mode: str = "w", partition_cols: Optional[Union[str, List[str]]] = None, index_col: Optional[Union[str, List[str]]] = None, **options: "OptionalPrimitiveType" @@ -4737,14 +4745,16 @@ defaultdict(<class 'list'>, {'col..., 'col...})] ---------- path : str, required Path to write to. - mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, - default 'overwrite'. Specifies the behavior of the save operation when the - destination exists already. + mode : str + Python write mode, default 'w'. + + .. note:: mode can accept the strings for Spark writing mode. + Such as 'append', 'overwrite', 'ignore', 'error', 'errorifexists'. - - 'append': Append the new data to existing data. - - 'overwrite': Overwrite existing data. - - 'ignore': Silently ignore this operation if data already exists. - - 'error' or 'errorifexists': Throw an exception if data already exists. + - 'append' (equivalent to 'a'): Append the new data to existing data. + - 'overwrite' (equivalent to 'w'): Overwrite existing data. + - 'ignore': Silently ignore this operation if data already exists. + - 'error' or 'errorifexists': Throw an exception if data already exists. partition_cols : str or list of str, optional, default None Names of partitioning columns @@ -4784,6 +4794,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1: options = options.get("options") # type: ignore + mode = validate_mode(mode) self.spark.to_spark_io( path=path, mode=mode, diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index 6ed83d0..7ec9b05 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -890,7 +890,7 @@ class Frame(object, metaclass=ABCMeta): path: Optional[str] = None, compression: str = "uncompressed", num_files: Optional[int] = None, - mode: str = "overwrite", + mode: str = "w", orient: str = "records", lines: bool = True, partition_cols: Optional[Union[str, List[str]]] = None, @@ -931,14 +931,16 @@ class Frame(object, metaclass=ABCMeta): compression is inferred from the filename. num_files : the number of files to be written in `path` directory when this is a path. - mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, - default 'overwrite'. Specifies the behavior of the save operation when the - destination exists already. + mode : str + Python write mode, default 'w'. - - 'append': Append the new data to existing data. - - 'overwrite': Overwrite existing data. - - 'ignore': Silently ignore this operation if data already exists. - - 'error' or 'errorifexists': Throw an exception if data already exists. + .. note:: mode can accept the strings for Spark writing mode. + Such as 'append', 'overwrite', 'ignore', 'error', 'errorifexists'. + + - 'append' (equivalent to 'a'): Append the new data to existing data. + - 'overwrite' (equivalent to 'w'): Overwrite existing data. + - 'ignore': Silently ignore this operation if data already exists. + - 'error' or 'errorifexists': Throw an exception if data already exists. partition_cols : str or list of str, optional, default None Names of partitioning columns @@ -1014,6 +1016,7 @@ class Frame(object, metaclass=ABCMeta): ) sdf = sdf.repartition(num_files) + mode = validate_mode(mode) builder = sdf.write.mode(mode) if partition_cols is not None: builder.partitionBy(partition_cols) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org