This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 658b697  AVRO-2546: Add bzip2 and xz support to the Python3 bindings 
(#638)
658b697 is described below

commit 658b697a24685f7710c8596a68a86fe6a3f92725
Author: Kengo Seki <[email protected]>
AuthorDate: Tue Sep 10 19:15:33 2019 +0900

    AVRO-2546: Add bzip2 and xz support to the Python3 bindings (#638)
    
    * AVRO-2546: Add bzip2 and xz support to the Python3 bindings
    
    * Add bzip2 and xz support to the Python3 bindings
    
    * Add data interoperability test for these codecs
      executed between Java and Python3
    
    * Fix the Python2 data interop test to skip the files
      which are compressed with the unsupported codecs
    
    * Add print statements to the Python test codes so that
      developers can confirm which files are read and skipped
      during the data interop tests
    
    * Move compression codec strings to constants
---
 lang/java/avro/pom.xml                       | 37 +++++++++++++++++--
 lang/java/ipc/pom.xml                        |  4 +++
 lang/java/pom.xml                            |  5 +++
 lang/py/test/test_datafile_interop.py        | 25 +++++++------
 lang/py3/avro/datafile.py                    | 54 ++++++++++++++++++++--------
 lang/py3/avro/tests/gen_interop_data.py      |  3 +-
 lang/py3/avro/tests/test_datafile.py         |  7 ++--
 lang/py3/avro/tests/test_datafile_interop.py |  3 ++
 8 files changed, 108 insertions(+), 30 deletions(-)

diff --git a/lang/java/avro/pom.xml b/lang/java/avro/pom.xml
index 6922e1b..0bed7aa 100644
--- a/lang/java/avro/pom.xml
+++ b/lang/java/avro/pom.xml
@@ -118,6 +118,7 @@
                 </configuration>
                 <goals><goal>java</goal></goals>
               </execution>
+
               <!-- Generate random data for interop tests, using deflate codec 
-->
               <execution>
                 <id>interop-generate-deflate-codec</id>
@@ -133,6 +134,7 @@
                 </configuration>
                 <goals><goal>java</goal></goals>
               </execution>
+
               <!-- Generate random data for interop tests, using snappy codec 
-->
               <execution>
                 <id>interop-generate-snappy-codec</id>
@@ -148,6 +150,39 @@
                 </configuration>
                 <goals><goal>java</goal></goals>
               </execution>
+
+              <!-- Generate random data for interop tests, using bzip2 codec 
-->
+              <execution>
+                <id>interop-generate-bzip2-codec</id>
+                <phase>generate-resources</phase>
+                <configuration>
+                  <mainClass>org.apache.avro.util.RandomData</mainClass>
+                  <arguments>
+                    
<argument>../../../share/test/schemas/interop.avsc</argument>
+                    
<argument>../../../build/interop/data/java_bzip2.avro</argument>
+                    <argument>100</argument>
+                    <argument>bzip2</argument>
+                  </arguments>
+                </configuration>
+                <goals><goal>java</goal></goals>
+              </execution>
+
+              <!-- Generate random data for interop tests, using xz codec -->
+              <execution>
+                <id>interop-generate-xz-codec</id>
+                <phase>generate-resources</phase>
+                <configuration>
+                  <mainClass>org.apache.avro.util.RandomData</mainClass>
+                  <arguments>
+                    
<argument>../../../share/test/schemas/interop.avsc</argument>
+                    
<argument>../../../build/interop/data/java_xz.avro</argument>
+                    <argument>100</argument>
+                    <argument>xz</argument>
+                  </arguments>
+                </configuration>
+                <goals><goal>java</goal></goals>
+              </execution>
+
               <!-- Generate random data for interop tests, using zstandard 
codec -->
               <execution>
                 <id>interop-generate-zstandard-codec</id>
@@ -191,8 +226,6 @@
     <dependency>
       <groupId>org.tukaani</groupId>
       <artifactId>xz</artifactId>
-      <version>${tukaani.version}</version>
-      <scope>provided</scope>
       <optional>true</optional>
     </dependency>
     <dependency>
diff --git a/lang/java/ipc/pom.xml b/lang/java/ipc/pom.xml
index c968296..554476a 100644
--- a/lang/java/ipc/pom.xml
+++ b/lang/java/ipc/pom.xml
@@ -154,6 +154,10 @@
       <artifactId>snappy-java</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.tukaani</groupId>
+      <artifactId>xz</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.github.luben</groupId>
       <artifactId>zstd-jni</artifactId>
     </dependency>
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index 85bdfb5..fb50fc2 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -595,6 +595,11 @@
         <version>${netty-codec-http2.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.tukaani</groupId>
+        <artifactId>xz</artifactId>
+        <version>${tukaani.version}</version>
+      </dependency>
+      <dependency>
         <groupId>com.github.luben</groupId>
         <artifactId>zstd-jni</artifactId>
         <version>${zstd-jni.version}</version>
diff --git a/lang/py/test/test_datafile_interop.py 
b/lang/py/test/test_datafile_interop.py
index f2cecd5..a44e53e 100644
--- a/lang/py/test/test_datafile_interop.py
+++ b/lang/py/test/test_datafile_interop.py
@@ -27,17 +27,22 @@ class TestDataFileInterop(unittest.TestCase):
     print '============'
     print ''
     for f in os.listdir('@INTEROP_DATA_DIR@'):
-      print 'READING %s' % f
-      print ''
+      base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
+      if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+        print 'READING %s' % f
+        print ''
 
-      # read data in binary from file
-      reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
-      datum_reader = io.DatumReader()
-      dfr = datafile.DataFileReader(reader, datum_reader)
-      i = 0
-      for i, datum in enumerate(dfr, 1):
-        assert datum is not None
-      assert i > 0
+        # read data in binary from file
+        reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        i = 0
+        for i, datum in enumerate(dfr, 1):
+          assert datum is not None
+        assert i > 0
+      else:
+        print 'SKIPPING %s due to an unsupported codec' % f
+        print ''
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/lang/py3/avro/datafile.py b/lang/py3/avro/datafile.py
index 3d141fc..e60b4b7 100644
--- a/lang/py3/avro/datafile.py
+++ b/lang/py3/avro/datafile.py
@@ -20,8 +20,10 @@
 
 """Read/Write Avro File Object Containers."""
 
+import bz2
 import io
 import logging
+import lzma
 import os
 import zlib
 
@@ -81,12 +83,20 @@ META_SCHEMA = schema.Parse("""
     'sync_size': SYNC_SIZE,
 })
 
+# Compression codecs:
+NULL_CODEC = 'null'
+DEFLATE_CODEC = 'deflate'
+BZIP2_CODEC = 'bzip2'
+SNAPPY_CODEC = 'snappy'
+XZ_CODEC = 'xz'
+ZSTANDARD_CODEC = 'zstandard'
+
 # Codecs supported by container files:
-VALID_CODECS = frozenset(['null', 'deflate'])
+VALID_CODECS = frozenset([NULL_CODEC, DEFLATE_CODEC, BZIP2_CODEC, XZ_CODEC])
 if has_snappy:
-  VALID_CODECS = frozenset.union(VALID_CODECS, ['snappy'])
+  VALID_CODECS = frozenset.union(VALID_CODECS, [SNAPPY_CODEC])
 if has_zstandard:
-  VALID_CODECS = frozenset.union(VALID_CODECS, ['zstandard'])
+  VALID_CODECS = frozenset.union(VALID_CODECS, [ZSTANDARD_CODEC])
 
 # Not used yet
 VALID_ENCODINGS = frozenset(['binary'])
@@ -126,7 +136,7 @@ class DataFileWriter(object):
       writer,
       datum_writer,
       writer_schema=None,
-      codec='null',
+      codec=NULL_CODEC,
   ):
     """Constructs a new DataFileWriter instance.
 
@@ -268,18 +278,24 @@ class DataFileWriter(object):
     # write block contents
     uncompressed_data = self._buffer_writer.getvalue()
     codec = self.GetMeta(CODEC_KEY).decode('utf-8')
-    if codec == 'null':
+    if codec == NULL_CODEC:
       compressed_data = uncompressed_data
       compressed_data_length = len(compressed_data)
-    elif codec == 'deflate':
+    elif codec == DEFLATE_CODEC:
       # The first two characters and last character are zlib
       # wrappers around deflate data.
       compressed_data = zlib.compress(uncompressed_data)[2:-1]
       compressed_data_length = len(compressed_data)
-    elif codec == 'snappy':
+    elif codec == BZIP2_CODEC:
+      compressed_data = bz2.compress(uncompressed_data)
+      compressed_data_length = len(compressed_data)
+    elif codec == SNAPPY_CODEC:
       compressed_data = snappy.compress(uncompressed_data)
       compressed_data_length = len(compressed_data) + 4 # crc32
-    elif codec == 'zstandard':
+    elif codec == XZ_CODEC:
+      compressed_data = lzma.compress(uncompressed_data)
+      compressed_data_length = len(compressed_data)
+    elif codec == ZSTANDARD_CODEC:
       compressed_data = zstd.ZstdCompressor().compress(uncompressed_data)
       compressed_data_length = len(compressed_data)
     else:
@@ -293,7 +309,7 @@ class DataFileWriter(object):
     self.writer.write(compressed_data)
 
     # Write CRC32 checksum for Snappy
-    if codec == 'snappy':
+    if codec == SNAPPY_CODEC:
       self.encoder.write_crc32(uncompressed_data)
 
     # write sync marker
@@ -363,7 +379,7 @@ class DataFileReader(object):
     # ensure codec is valid
     avro_codec_raw = self.GetMeta('avro.codec')
     if avro_codec_raw is None:
-      self.codec = "null"
+      self.codec = NULL_CODEC
     else:
       self.codec = avro_codec_raw.decode('utf-8')
     if self.codec not in VALID_CODECS:
@@ -486,11 +502,11 @@ class DataFileReader(object):
 
   def _read_block_header(self):
     self._block_count = self.raw_decoder.read_long()
-    if self.codec == "null":
+    if self.codec == NULL_CODEC:
       # Skip a long; we don't need to use the length.
       self.raw_decoder.skip_long()
       self._datum_decoder = self._raw_decoder
-    elif self.codec == 'deflate':
+    elif self.codec == DEFLATE_CODEC:
       # Compressed data is stored as (length, data), which
       # corresponds to how the "bytes" type is encoded.
       data = self.raw_decoder.read_bytes()
@@ -498,14 +514,24 @@ class DataFileReader(object):
       # "raw" (no zlib headers) decompression.  See zlib.h.
       uncompressed = zlib.decompress(data, -15)
       self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
-    elif self.codec == 'snappy':
+    elif self.codec == BZIP2_CODEC:
+      length = self.raw_decoder.read_long()
+      data = self.raw_decoder.read(length)
+      uncompressed = bz2.decompress(data)
+      self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+    elif self.codec == SNAPPY_CODEC:
       # Compressed data includes a 4-byte CRC32 checksum
       length = self.raw_decoder.read_long()
       data = self.raw_decoder.read(length - 4)
       uncompressed = snappy.decompress(data)
       self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
       self.raw_decoder.check_crc32(uncompressed);
-    elif self.codec == 'zstandard':
+    elif self.codec == XZ_CODEC:
+      length = self.raw_decoder.read_long()
+      data = self.raw_decoder.read(length)
+      uncompressed = lzma.decompress(data)
+      self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+    elif self.codec == ZSTANDARD_CODEC:
       length = self.raw_decoder.read_long()
       data = self.raw_decoder.read(length)
       uncompressed = bytearray()
diff --git a/lang/py3/avro/tests/gen_interop_data.py 
b/lang/py3/avro/tests/gen_interop_data.py
index d2a7d50..0306c21 100644
--- a/lang/py3/avro/tests/gen_interop_data.py
+++ b/lang/py3/avro/tests/gen_interop_data.py
@@ -22,6 +22,7 @@ import sys
 from pathlib import Path
 
 from avro import datafile, io, schema
+from avro.datafile import NULL_CODEC
 
 DATUM = {
     'intField': 12,
@@ -49,7 +50,7 @@ def generate(schema_file, output_path):
   datum_writer = io.DatumWriter()
   for codec in datafile.VALID_CODECS:
     filename = 'py3'
-    if codec != 'null':
+    if codec != NULL_CODEC:
       filename += '_' + codec
     with Path(output_path, filename).with_suffix('.avro').open('wb') as 
writer, \
       datafile.DataFileWriter(writer, datum_writer, interop_schema, codec) as 
dfw:
diff --git a/lang/py3/avro/tests/test_datafile.py 
b/lang/py3/avro/tests/test_datafile.py
index ca7acd9..6256f35 100644
--- a/lang/py3/avro/tests/test_datafile.py
+++ b/lang/py3/avro/tests/test_datafile.py
@@ -24,6 +24,7 @@ import tempfile
 import unittest
 
 from avro import datafile, io, schema
+from avro.datafile import NULL_CODEC, DEFLATE_CODEC, BZIP2_CODEC, 
SNAPPY_CODEC, XZ_CODEC, ZSTANDARD_CODEC
 
 try:
   import snappy
@@ -86,15 +87,15 @@ SCHEMAS_TO_VALIDATE = (
 )
 
 def get_codecs_to_validate():
-  codecs = ('null', 'deflate')
+  codecs = (NULL_CODEC, DEFLATE_CODEC, BZIP2_CODEC, XZ_CODEC)
 
   if HAS_SNAPPY:
-    codecs += ('snappy',)
+    codecs += (SNAPPY_CODEC,)
   else:
     logging.warning('Snappy not present, will skip testing it.')
 
   if HAS_ZSTANDARD:
-    codecs += ('zstandard',)
+    codecs += (ZSTANDARD_CODEC,)
   else:
     logging.warning('Zstandard not present, will skip testing it.')
 
diff --git a/lang/py3/avro/tests/test_datafile_interop.py 
b/lang/py3/avro/tests/test_datafile_interop.py
index 12d5605..a3cbdf2 100644
--- a/lang/py3/avro/tests/test_datafile_interop.py
+++ b/lang/py3/avro/tests/test_datafile_interop.py
@@ -32,12 +32,15 @@ class TestDataFileInterop(unittest.TestCase):
     for avro_file in glob.glob('../../build/interop/data/*.avro'):
       base_ext = os.path.splitext(os.path.basename(avro_file))[0].split('_', 1)
       if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+        print('Reading {}'.format(avro_file))
         with open(avro_file, 'rb') as reader, \
           datafile.DataFileReader(reader, datum_reader) as dfr:
           i = 0
           for i, datum in enumerate(dfr, 1):
             self.assertIsNotNone(datum)
           self.assertGreater(i, 0)
+      else:
+        print('Skipping {} due to an unsupported codec'.format(avro_file))
 
 
 if __name__ == '__main__':

Reply via email to