This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f7d2fbaeb4 (#25316) Enable LZMA compression in Python SDK I/O (#25317)
6f7d2fbaeb4 is described below

commit 6f7d2fbaeb4a71173df0d9553538ae79c23e6314
Author: William Ross Morrow <morro...@gmail.com>
AuthorDate: Mon Feb 13 06:31:01 2023 -0800

    (#25316) Enable LZMA compression in Python SDK I/O (#25317)
    
    * (#25316) Added naive first shot at enabling LZMA compression
    
    * (#25316) Added a draft line to CHANGES.md
    
    * (#25316) fix linter issues
    
    * (#25316) update tests (draft)
    
    * (#25316) import order in test file
---
 CHANGES.md                                    |  1 +
 sdks/python/apache_beam/io/filesystem.py      | 21 +++++++++++++++++++--
 sdks/python/apache_beam/io/filesystem_test.py | 25 +++++++++++++++++++------
 3 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d01c764b7ea..10310c6cbef 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -61,6 +61,7 @@
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * Added in JmsIO a retry policy for failed publications (Java) 
([#24971](https://github.com/apache/beam/issues/24971)).
+* Support for `LZMA` compression/decompression of text files added to the 
Python SDK ([#25316](https://github.com/apache/beam/issues/25316))
 
 ## New Features / Improvements
 
diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index fa1f67ac03f..142e04bc295 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -28,6 +28,7 @@ import abc
 import bz2
 import io
 import logging
+import lzma
 import os
 import posixpath
 import re
@@ -65,6 +66,10 @@ class CompressionTypes(object):
   #   .bz2 (implies BZIP2 as described below).
   #   .gz  (implies GZIP as described below)
   #   .deflate (implies DEFLATE as described below)
+  #   .zst (implies ZSTD as described below)
+  #   .zst (implies ZSTD as described below)
+  #   .xz (implies LZMA as described below)
+  #   .lzma (implies LZMA as described below)
   # Any non-recognized extension implies UNCOMPRESSED as described below.
   AUTO = 'auto'
 
@@ -80,6 +85,9 @@ class CompressionTypes(object):
   # GZIP compression (deflate with GZIP headers).
   GZIP = 'gzip'
 
+  # LZMA compression
+  LZMA = 'lzma'
+
   # Uncompressed (i.e., may be split).
   UNCOMPRESSED = 'uncompressed'
 
@@ -92,6 +100,7 @@ class CompressionTypes(object):
         CompressionTypes.DEFLATE,
         CompressionTypes.GZIP,
         CompressionTypes.ZSTD,
+        CompressionTypes.LZMA,
         CompressionTypes.UNCOMPRESSED
     ])
     return compression_type in types
@@ -103,6 +112,7 @@ class CompressionTypes(object):
         cls.DEFLATE: 'application/x-deflate',
         cls.GZIP: 'application/x-gzip',
         cls.ZSTD: 'application/zstd',
+        cls.LZMA: 'application/lzma'
     }
     return mime_types_by_compression_type.get(compression_type, default)
 
@@ -114,7 +124,9 @@ class CompressionTypes(object):
         '.deflate': cls.DEFLATE,
         '.gz': cls.GZIP,
         '.zst': cls.ZSTD,
-        '.zstd': cls.ZSTD
+        '.zstd': cls.ZSTD,
+        '.xz': cls.LZMA,
+        '.lzma': cls.LZMA
     }
     lowercased_path = file_path.lower()
     for suffix, compression_type in compression_types_by_suffix.items():
@@ -184,6 +196,8 @@ class CompressedFile(object):
       # https://github.com/indygreg/python-zstandard/issues/157
       self._decompressor = zstandard.ZstdDecompressor(
           max_window_size=2147483648).decompressobj()
+    elif self._compression_type == CompressionTypes.LZMA:
+      self._decompressor = lzma.LZMADecompressor()
     else:
       assert self._compression_type == CompressionTypes.GZIP
       self._decompressor = zlib.decompressobj(self._gzip_mask)
@@ -196,6 +210,8 @@ class CompressedFile(object):
           zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED)
     elif self._compression_type == CompressionTypes.ZSTD:
       self._compressor = zstandard.ZstdCompressor().compressobj()
+    elif self._compression_type == CompressionTypes.LZMA:
+      self._compressor = lzma.LZMACompressor()
     else:
       assert self._compression_type == CompressionTypes.GZIP
       self._compressor = zlib.compressobj(
@@ -257,7 +273,8 @@ class CompressedFile(object):
         if (self._compression_type == CompressionTypes.BZIP2 or
             self._compression_type == CompressionTypes.DEFLATE or
             self._compression_type == CompressionTypes.ZSTD or
-            self._compression_type == CompressionTypes.GZIP):
+            self._compression_type == CompressionTypes.GZIP or
+            self._compression_type == CompressionTypes.LZMA):
           pass
         else:
           # Deflate, Gzip and bzip2 formats do not require flushing
diff --git a/sdks/python/apache_beam/io/filesystem_test.py 
b/sdks/python/apache_beam/io/filesystem_test.py
index 0b1827f9f7d..52f0e502a22 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -22,6 +22,7 @@
 import bz2
 import gzip
 import logging
+import lzma
 import ntpath
 import os
 import posixpath
@@ -316,6 +317,10 @@ atomized in instants hammered around the
       compress_open = zstandard.open
       with compress_open(file_name, 'wb') as f:
         f.write(content)
+    elif compression_type == CompressionTypes.LZMA:
+      compress_open = lzma.open
+      with compress_open(file_name, 'wb') as f:
+        f.write(content)
     else:
       assert False, "Invalid compression type: %s" % compression_type
 
@@ -340,7 +345,8 @@ atomized in instants hammered around the
     for compression_type in [CompressionTypes.BZIP2,
                              CompressionTypes.DEFLATE,
                              CompressionTypes.GZIP,
-                             CompressionTypes.ZSTD]:
+                             CompressionTypes.ZSTD,
+                             CompressionTypes.LZMA]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(
@@ -375,7 +381,8 @@ atomized in instants hammered around the
     for compression_type in [CompressionTypes.BZIP2,
                              CompressionTypes.DEFLATE,
                              CompressionTypes.GZIP,
-                             CompressionTypes.ZSTD]:
+                             CompressionTypes.ZSTD,
+                             CompressionTypes.LZMA]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(
@@ -410,7 +417,8 @@ atomized in instants hammered around the
     for compression_type in [CompressionTypes.BZIP2,
                              CompressionTypes.DEFLATE,
                              CompressionTypes.GZIP,
-                             CompressionTypes.ZSTD]:
+                             CompressionTypes.ZSTD,
+                             CompressionTypes.LZMA]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(
@@ -428,7 +436,8 @@ atomized in instants hammered around the
     for compression_type in [CompressionTypes.BZIP2,
                              CompressionTypes.DEFLATE,
                              CompressionTypes.GZIP,
-                             CompressionTypes.ZSTD]:
+                             CompressionTypes.ZSTD,
+                             CompressionTypes.LZMA]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(
@@ -453,7 +462,8 @@ atomized in instants hammered around the
     for compression_type in [CompressionTypes.BZIP2,
                              CompressionTypes.DEFLATE,
                              CompressionTypes.GZIP,
-                             CompressionTypes.ZSTD]:
+                             CompressionTypes.ZSTD,
+                             CompressionTypes.LZMA]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(
@@ -520,6 +530,8 @@ atomized in instants hammered around the
         compress_factory = gzip.open
       elif compression_type == CompressionTypes.ZSTD:
         compress_factory = zstandard.open
+      elif compression_type == CompressionTypes.LZMA:
+        compress_factory = lzma.open
       else:
         assert False, "Invalid compression type: %s" % compression_type
       for line in lines:
@@ -547,7 +559,8 @@ atomized in instants hammered around the
       test_lines = tuple(generate_random_line() for i in range(num_test_lines))
       for compression_type in [CompressionTypes.BZIP2,
                                CompressionTypes.GZIP,
-                               CompressionTypes.ZSTD]:
+                               CompressionTypes.ZSTD,
+                               CompressionTypes.LZMA]:
         file_name = create_test_file(compression_type, test_lines)
         timer.start()
         with open(file_name, 'rb') as f:

Reply via email to