This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6f7d2fbaeb4 (#25316) Enable LZMA compression in Python SDK I/O (#25317) 6f7d2fbaeb4 is described below commit 6f7d2fbaeb4a71173df0d9553538ae79c23e6314 Author: William Ross Morrow <morro...@gmail.com> AuthorDate: Mon Feb 13 06:31:01 2023 -0800 (#25316) Enable LZMA compression in Python SDK I/O (#25317) * (#25316) Added naive first shot at enabling LZMA compression * (#25316) Added a draft line to CHANGES.md * (#25316) fix linter issues * (#25316) update tests (draft) * (#25316) import order in test file --- CHANGES.md | 1 + sdks/python/apache_beam/io/filesystem.py | 21 +++++++++++++++++++-- sdks/python/apache_beam/io/filesystem_test.py | 25 +++++++++++++++++++------ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d01c764b7ea..10310c6cbef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,6 +61,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)). +* Support for `LZMA` compression/decompression of text files added to the Python SDK ([#25316](https://github.com/apache/beam/issues/25316)) ## New Features / Improvements diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index fa1f67ac03f..142e04bc295 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -28,6 +28,7 @@ import abc import bz2 import io import logging +import lzma import os import posixpath import re @@ -65,6 +66,10 @@ class CompressionTypes(object): # .bz2 (implies BZIP2 as described below). # .gz (implies GZIP as described below) # .deflate (implies DEFLATE as described below) + # .zst (implies ZSTD as described below) + # .zst (implies ZSTD as described below) + # .xz (implies LZMA as described below) + # .lzma (implies LZMA as described below) # Any non-recognized extension implies UNCOMPRESSED as described below. AUTO = 'auto' @@ -80,6 +85,9 @@ class CompressionTypes(object): # GZIP compression (deflate with GZIP headers). GZIP = 'gzip' + # LZMA compression + LZMA = 'lzma' + # Uncompressed (i.e., may be split). UNCOMPRESSED = 'uncompressed' @@ -92,6 +100,7 @@ class CompressionTypes(object): CompressionTypes.DEFLATE, CompressionTypes.GZIP, CompressionTypes.ZSTD, + CompressionTypes.LZMA, CompressionTypes.UNCOMPRESSED ]) return compression_type in types @@ -103,6 +112,7 @@ class CompressionTypes(object): cls.DEFLATE: 'application/x-deflate', cls.GZIP: 'application/x-gzip', cls.ZSTD: 'application/zstd', + cls.LZMA: 'application/lzma' } return mime_types_by_compression_type.get(compression_type, default) @@ -114,7 +124,9 @@ class CompressionTypes(object): '.deflate': cls.DEFLATE, '.gz': cls.GZIP, '.zst': cls.ZSTD, - '.zstd': cls.ZSTD + '.zstd': cls.ZSTD, + '.xz': cls.LZMA, + '.lzma': cls.LZMA } lowercased_path = file_path.lower() for suffix, compression_type in compression_types_by_suffix.items(): @@ -184,6 +196,8 @@ class CompressedFile(object): # https://github.com/indygreg/python-zstandard/issues/157 self._decompressor = zstandard.ZstdDecompressor( max_window_size=2147483648).decompressobj() + elif self._compression_type == CompressionTypes.LZMA: + self._decompressor = lzma.LZMADecompressor() else: assert self._compression_type == CompressionTypes.GZIP self._decompressor = zlib.decompressobj(self._gzip_mask) @@ -196,6 +210,8 @@ class CompressedFile(object): zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED) elif self._compression_type == CompressionTypes.ZSTD: self._compressor = zstandard.ZstdCompressor().compressobj() + elif self._compression_type == CompressionTypes.LZMA: + self._compressor = lzma.LZMACompressor() else: assert self._compression_type == CompressionTypes.GZIP self._compressor = zlib.compressobj( @@ -257,7 +273,8 @@ class CompressedFile(object): if (self._compression_type == CompressionTypes.BZIP2 or self._compression_type == CompressionTypes.DEFLATE or self._compression_type == CompressionTypes.ZSTD or - self._compression_type == CompressionTypes.GZIP): + self._compression_type == CompressionTypes.GZIP or + self._compression_type == CompressionTypes.LZMA): pass else: # Deflate, Gzip and bzip2 formats do not require flushing diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py index 0b1827f9f7d..52f0e502a22 100644 --- a/sdks/python/apache_beam/io/filesystem_test.py +++ b/sdks/python/apache_beam/io/filesystem_test.py @@ -22,6 +22,7 @@ import bz2 import gzip import logging +import lzma import ntpath import os import posixpath @@ -316,6 +317,10 @@ atomized in instants hammered around the compress_open = zstandard.open with compress_open(file_name, 'wb') as f: f.write(content) + elif compression_type == CompressionTypes.LZMA: + compress_open = lzma.open + with compress_open(file_name, 'wb') as f: + f.write(content) else: assert False, "Invalid compression type: %s" % compression_type @@ -340,7 +345,8 @@ atomized in instants hammered around the for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, CompressionTypes.GZIP, - CompressionTypes.ZSTD]: + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile( @@ -375,7 +381,8 @@ atomized in instants hammered around the for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, CompressionTypes.GZIP, - CompressionTypes.ZSTD]: + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile( @@ -410,7 +417,8 @@ atomized in instants hammered around the for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, CompressionTypes.GZIP, - CompressionTypes.ZSTD]: + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile( @@ -428,7 +436,8 @@ atomized in instants hammered around the for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, CompressionTypes.GZIP, - CompressionTypes.ZSTD]: + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile( @@ -453,7 +462,8 @@ atomized in instants hammered around the for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE, CompressionTypes.GZIP, - CompressionTypes.ZSTD]: + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: file_name = self._create_compressed_file(compression_type, self.content) with open(file_name, 'rb') as f: compressed_fd = CompressedFile( @@ -520,6 +530,8 @@ atomized in instants hammered around the compress_factory = gzip.open elif compression_type == CompressionTypes.ZSTD: compress_factory = zstandard.open + elif compression_type == CompressionTypes.LZMA: + compress_factory = lzma.open else: assert False, "Invalid compression type: %s" % compression_type for line in lines: @@ -547,7 +559,8 @@ atomized in instants hammered around the test_lines = tuple(generate_random_line() for i in range(num_test_lines)) for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP, - CompressionTypes.ZSTD]: + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: file_name = create_test_file(compression_type, test_lines) timer.start() with open(file_name, 'rb') as f: