cloud-fan commented on code in PR #46672:
URL: https://github.com/apache/spark/pull/46672#discussion_r2185452186
##########
python/pyspark/sql/functions/builtin.py:
##########
@@ -26850,6 +26850,160 @@ def udtf(
return _create_py_udtf(cls=cls, returnType=returnType,
useArrow=useArrow)
+@_try_remote_functions
+def zstd_compress(
+ input: "ColumnOrName",
+ level: Optional["ColumnOrName"] = None,
+ streaming_mode: Optional["ColumnOrName"] = None,
+) -> Column:
+ """
+ Returns a compressed value of `expr` using Zstandard with the specified
compression `level`.
+ The default level is 3. Uses single-pass mode by default.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ input : :class:`~pyspark.sql.Column` or str
+ The binary value to compress.
+ level : :class:`~pyspark.sql.Column` or int, optional
+ Optional integer argument that represents the compression level. The
compression level
+ controls the trade-off between compression speed and compression
ratio. Valid values:
+ between 1 and 22 inclusive, where 1 means fastest but lowest
compression ratio, and 22 means
+ slowest but highest compression ratio. The default level is 3 if not
specified.
+ streaming_mode : :class:`~pyspark.sql.Column` or bool, optional
+ Optional boolean argument that represents whether to use streaming
mode. If true, the
+ function will compress in streaming mode. The default value is false.
+
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ A new column that contains a compressed value.
+
+ Examples
+ --------
+ Example 1: Compress data using Zstandard.
+
+ >>> import pyspark.sql.functions as sf
+ >>> df = spark.createDataFrame([("Apache Spark " * 10,)], ["input"])
+ >>>
df.select(sf.base64(sf.zstd_compress(df.input)).alias("result")).show(truncate=False)
+ +----------------------------------------+
+ |result |
+ +----------------------------------------+
+ |KLUv/SCCpQAAaEFwYWNoZSBTcGFyayABABLS+QU=|
+ +----------------------------------------+
+
+ Example 2: Compress data using Zstandard with given compression level.
+
+ >>> import pyspark.sql.functions as sf
+ >>> df = spark.createDataFrame([("Apache Spark " * 10,)], ["input"])
+ >>> df.select(sf.base64(sf.zstd_compress(df.input,
sf.lit(5))).alias("result")).show(truncate=False)
+ +----------------------------------------+
+ |result |
+ +----------------------------------------+
+ |KLUv/SCCpQAAaEFwYWNoZSBTcGFyayABABLS+QU=|
+ +----------------------------------------+
+
+ Example 3: Compress data using Zstandard in streaming mode.
+
+ >>> import pyspark.sql.functions as sf
+ >>> df = spark.createDataFrame([("Apache Spark " * 10,)], ["input"])
+ >>> df.select(sf.base64(sf.zstd_compress(df.input, sf.lit(3),
sf.lit(True))).alias("result")).show(truncate=False)
+ +--------------------------------------------+
+ |result |
+ +--------------------------------------------+
+ |KLUv/QBYpAAAaEFwYWNoZSBTcGFyayABABLS+QUBAAA=|
+ +--------------------------------------------+
+ """ # noqa: E501
+ _level = lit(3) if level is None else level
+ _streaming_mode = lit(False) if streaming_mode is None else streaming_mode
+ return _invoke_function_over_columns("zstd_compress", input, _level,
_streaming_mode)
+
+
+@_try_remote_functions
+def zstd_decompress(
+ input: "ColumnOrName",
+) -> Column:
+ """
+ Returns the decompressed value of `expr` using Zstandard.
+ Supports data compressed in both single-pass mode and streaming mode.
+ On decompression failure, it throws an exception.
+
+ .. versionadded:: 4.0.0
Review Comment:
4.1.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]