This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 658b697 AVRO-2546: Add bzip2 and xz support to the Python3 bindings
(#638)
658b697 is described below
commit 658b697a24685f7710c8596a68a86fe6a3f92725
Author: Kengo Seki <[email protected]>
AuthorDate: Tue Sep 10 19:15:33 2019 +0900
AVRO-2546: Add bzip2 and xz support to the Python3 bindings (#638)
* AVRO-2546: Add bzip2 and xz support to the Python3 bindings
* Add bzip2 and xz support to the Python3 bindings
* Add data interoperability test for these codecs
executed between Java and Python3
* Fix the Python2 data interop test to skip the files
which are compressed with the unsupported codecs
* Add print statements to the Python test codes so that
developers can confirm which files are read and skipped
during the data interop tests
* Move compression codec strings to constants
---
lang/java/avro/pom.xml | 37 +++++++++++++++++--
lang/java/ipc/pom.xml | 4 +++
lang/java/pom.xml | 5 +++
lang/py/test/test_datafile_interop.py | 25 +++++++------
lang/py3/avro/datafile.py | 54 ++++++++++++++++++++--------
lang/py3/avro/tests/gen_interop_data.py | 3 +-
lang/py3/avro/tests/test_datafile.py | 7 ++--
lang/py3/avro/tests/test_datafile_interop.py | 3 ++
8 files changed, 108 insertions(+), 30 deletions(-)
diff --git a/lang/java/avro/pom.xml b/lang/java/avro/pom.xml
index 6922e1b..0bed7aa 100644
--- a/lang/java/avro/pom.xml
+++ b/lang/java/avro/pom.xml
@@ -118,6 +118,7 @@
</configuration>
<goals><goal>java</goal></goals>
</execution>
+
<!-- Generate random data for interop tests, using deflate codec
-->
<execution>
<id>interop-generate-deflate-codec</id>
@@ -133,6 +134,7 @@
</configuration>
<goals><goal>java</goal></goals>
</execution>
+
<!-- Generate random data for interop tests, using snappy codec
-->
<execution>
<id>interop-generate-snappy-codec</id>
@@ -148,6 +150,39 @@
</configuration>
<goals><goal>java</goal></goals>
</execution>
+
+ <!-- Generate random data for interop tests, using bzip2 codec
-->
+ <execution>
+ <id>interop-generate-bzip2-codec</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <mainClass>org.apache.avro.util.RandomData</mainClass>
+ <arguments>
+
<argument>../../../share/test/schemas/interop.avsc</argument>
+
<argument>../../../build/interop/data/java_bzip2.avro</argument>
+ <argument>100</argument>
+ <argument>bzip2</argument>
+ </arguments>
+ </configuration>
+ <goals><goal>java</goal></goals>
+ </execution>
+
+ <!-- Generate random data for interop tests, using xz codec -->
+ <execution>
+ <id>interop-generate-xz-codec</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <mainClass>org.apache.avro.util.RandomData</mainClass>
+ <arguments>
+
<argument>../../../share/test/schemas/interop.avsc</argument>
+
<argument>../../../build/interop/data/java_xz.avro</argument>
+ <argument>100</argument>
+ <argument>xz</argument>
+ </arguments>
+ </configuration>
+ <goals><goal>java</goal></goals>
+ </execution>
+
<!-- Generate random data for interop tests, using zstandard
codec -->
<execution>
<id>interop-generate-zstandard-codec</id>
@@ -191,8 +226,6 @@
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
- <version>${tukaani.version}</version>
- <scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
diff --git a/lang/java/ipc/pom.xml b/lang/java/ipc/pom.xml
index c968296..554476a 100644
--- a/lang/java/ipc/pom.xml
+++ b/lang/java/ipc/pom.xml
@@ -154,6 +154,10 @@
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
+ <groupId>org.tukaani</groupId>
+ <artifactId>xz</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index 85bdfb5..fb50fc2 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -595,6 +595,11 @@
<version>${netty-codec-http2.version}</version>
</dependency>
<dependency>
+ <groupId>org.tukaani</groupId>
+ <artifactId>xz</artifactId>
+ <version>${tukaani.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
diff --git a/lang/py/test/test_datafile_interop.py
b/lang/py/test/test_datafile_interop.py
index f2cecd5..a44e53e 100644
--- a/lang/py/test/test_datafile_interop.py
+++ b/lang/py/test/test_datafile_interop.py
@@ -27,17 +27,22 @@ class TestDataFileInterop(unittest.TestCase):
print '============'
print ''
for f in os.listdir('@INTEROP_DATA_DIR@'):
- print 'READING %s' % f
- print ''
+ base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
+ if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+ print 'READING %s' % f
+ print ''
- # read data in binary from file
- reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
- datum_reader = io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
- i = 0
- for i, datum in enumerate(dfr, 1):
- assert datum is not None
- assert i > 0
+ # read data in binary from file
+ reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ i = 0
+ for i, datum in enumerate(dfr, 1):
+ assert datum is not None
+ assert i > 0
+ else:
+ print 'SKIPPING %s due to an unsupported codec' % f
+ print ''
if __name__ == '__main__':
unittest.main()
diff --git a/lang/py3/avro/datafile.py b/lang/py3/avro/datafile.py
index 3d141fc..e60b4b7 100644
--- a/lang/py3/avro/datafile.py
+++ b/lang/py3/avro/datafile.py
@@ -20,8 +20,10 @@
"""Read/Write Avro File Object Containers."""
+import bz2
import io
import logging
+import lzma
import os
import zlib
@@ -81,12 +83,20 @@ META_SCHEMA = schema.Parse("""
'sync_size': SYNC_SIZE,
})
+# Compression codecs:
+NULL_CODEC = 'null'
+DEFLATE_CODEC = 'deflate'
+BZIP2_CODEC = 'bzip2'
+SNAPPY_CODEC = 'snappy'
+XZ_CODEC = 'xz'
+ZSTANDARD_CODEC = 'zstandard'
+
# Codecs supported by container files:
-VALID_CODECS = frozenset(['null', 'deflate'])
+VALID_CODECS = frozenset([NULL_CODEC, DEFLATE_CODEC, BZIP2_CODEC, XZ_CODEC])
if has_snappy:
- VALID_CODECS = frozenset.union(VALID_CODECS, ['snappy'])
+ VALID_CODECS = frozenset.union(VALID_CODECS, [SNAPPY_CODEC])
if has_zstandard:
- VALID_CODECS = frozenset.union(VALID_CODECS, ['zstandard'])
+ VALID_CODECS = frozenset.union(VALID_CODECS, [ZSTANDARD_CODEC])
# Not used yet
VALID_ENCODINGS = frozenset(['binary'])
@@ -126,7 +136,7 @@ class DataFileWriter(object):
writer,
datum_writer,
writer_schema=None,
- codec='null',
+ codec=NULL_CODEC,
):
"""Constructs a new DataFileWriter instance.
@@ -268,18 +278,24 @@ class DataFileWriter(object):
# write block contents
uncompressed_data = self._buffer_writer.getvalue()
codec = self.GetMeta(CODEC_KEY).decode('utf-8')
- if codec == 'null':
+ if codec == NULL_CODEC:
compressed_data = uncompressed_data
compressed_data_length = len(compressed_data)
- elif codec == 'deflate':
+ elif codec == DEFLATE_CODEC:
# The first two characters and last character are zlib
# wrappers around deflate data.
compressed_data = zlib.compress(uncompressed_data)[2:-1]
compressed_data_length = len(compressed_data)
- elif codec == 'snappy':
+ elif codec == BZIP2_CODEC:
+ compressed_data = bz2.compress(uncompressed_data)
+ compressed_data_length = len(compressed_data)
+ elif codec == SNAPPY_CODEC:
compressed_data = snappy.compress(uncompressed_data)
compressed_data_length = len(compressed_data) + 4 # crc32
- elif codec == 'zstandard':
+ elif codec == XZ_CODEC:
+ compressed_data = lzma.compress(uncompressed_data)
+ compressed_data_length = len(compressed_data)
+ elif codec == ZSTANDARD_CODEC:
compressed_data = zstd.ZstdCompressor().compress(uncompressed_data)
compressed_data_length = len(compressed_data)
else:
@@ -293,7 +309,7 @@ class DataFileWriter(object):
self.writer.write(compressed_data)
# Write CRC32 checksum for Snappy
- if codec == 'snappy':
+ if codec == SNAPPY_CODEC:
self.encoder.write_crc32(uncompressed_data)
# write sync marker
@@ -363,7 +379,7 @@ class DataFileReader(object):
# ensure codec is valid
avro_codec_raw = self.GetMeta('avro.codec')
if avro_codec_raw is None:
- self.codec = "null"
+ self.codec = NULL_CODEC
else:
self.codec = avro_codec_raw.decode('utf-8')
if self.codec not in VALID_CODECS:
@@ -486,11 +502,11 @@ class DataFileReader(object):
def _read_block_header(self):
self._block_count = self.raw_decoder.read_long()
- if self.codec == "null":
+ if self.codec == NULL_CODEC:
# Skip a long; we don't need to use the length.
self.raw_decoder.skip_long()
self._datum_decoder = self._raw_decoder
- elif self.codec == 'deflate':
+ elif self.codec == DEFLATE_CODEC:
# Compressed data is stored as (length, data), which
# corresponds to how the "bytes" type is encoded.
data = self.raw_decoder.read_bytes()
@@ -498,14 +514,24 @@ class DataFileReader(object):
# "raw" (no zlib headers) decompression. See zlib.h.
uncompressed = zlib.decompress(data, -15)
self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
- elif self.codec == 'snappy':
+ elif self.codec == BZIP2_CODEC:
+ length = self.raw_decoder.read_long()
+ data = self.raw_decoder.read(length)
+ uncompressed = bz2.decompress(data)
+ self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+ elif self.codec == SNAPPY_CODEC:
# Compressed data includes a 4-byte CRC32 checksum
length = self.raw_decoder.read_long()
data = self.raw_decoder.read(length - 4)
uncompressed = snappy.decompress(data)
self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
self.raw_decoder.check_crc32(uncompressed);
- elif self.codec == 'zstandard':
+ elif self.codec == XZ_CODEC:
+ length = self.raw_decoder.read_long()
+ data = self.raw_decoder.read(length)
+ uncompressed = lzma.decompress(data)
+ self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+ elif self.codec == ZSTANDARD_CODEC:
length = self.raw_decoder.read_long()
data = self.raw_decoder.read(length)
uncompressed = bytearray()
diff --git a/lang/py3/avro/tests/gen_interop_data.py
b/lang/py3/avro/tests/gen_interop_data.py
index d2a7d50..0306c21 100644
--- a/lang/py3/avro/tests/gen_interop_data.py
+++ b/lang/py3/avro/tests/gen_interop_data.py
@@ -22,6 +22,7 @@ import sys
from pathlib import Path
from avro import datafile, io, schema
+from avro.datafile import NULL_CODEC
DATUM = {
'intField': 12,
@@ -49,7 +50,7 @@ def generate(schema_file, output_path):
datum_writer = io.DatumWriter()
for codec in datafile.VALID_CODECS:
filename = 'py3'
- if codec != 'null':
+ if codec != NULL_CODEC:
filename += '_' + codec
with Path(output_path, filename).with_suffix('.avro').open('wb') as
writer, \
datafile.DataFileWriter(writer, datum_writer, interop_schema, codec) as
dfw:
diff --git a/lang/py3/avro/tests/test_datafile.py
b/lang/py3/avro/tests/test_datafile.py
index ca7acd9..6256f35 100644
--- a/lang/py3/avro/tests/test_datafile.py
+++ b/lang/py3/avro/tests/test_datafile.py
@@ -24,6 +24,7 @@ import tempfile
import unittest
from avro import datafile, io, schema
+from avro.datafile import NULL_CODEC, DEFLATE_CODEC, BZIP2_CODEC,
SNAPPY_CODEC, XZ_CODEC, ZSTANDARD_CODEC
try:
import snappy
@@ -86,15 +87,15 @@ SCHEMAS_TO_VALIDATE = (
)
def get_codecs_to_validate():
- codecs = ('null', 'deflate')
+ codecs = (NULL_CODEC, DEFLATE_CODEC, BZIP2_CODEC, XZ_CODEC)
if HAS_SNAPPY:
- codecs += ('snappy',)
+ codecs += (SNAPPY_CODEC,)
else:
logging.warning('Snappy not present, will skip testing it.')
if HAS_ZSTANDARD:
- codecs += ('zstandard',)
+ codecs += (ZSTANDARD_CODEC,)
else:
logging.warning('Zstandard not present, will skip testing it.')
diff --git a/lang/py3/avro/tests/test_datafile_interop.py
b/lang/py3/avro/tests/test_datafile_interop.py
index 12d5605..a3cbdf2 100644
--- a/lang/py3/avro/tests/test_datafile_interop.py
+++ b/lang/py3/avro/tests/test_datafile_interop.py
@@ -32,12 +32,15 @@ class TestDataFileInterop(unittest.TestCase):
for avro_file in glob.glob('../../build/interop/data/*.avro'):
base_ext = os.path.splitext(os.path.basename(avro_file))[0].split('_', 1)
if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+ print('Reading {}'.format(avro_file))
with open(avro_file, 'rb') as reader, \
datafile.DataFileReader(reader, datum_reader) as dfr:
i = 0
for i, datum in enumerate(dfr, 1):
self.assertIsNotNone(datum)
self.assertGreater(i, 0)
+ else:
+ print('Skipping {} due to an unsupported codec'.format(avro_file))
if __name__ == '__main__':