Author: philz
Date: Wed Feb  3 06:58:46 2010
New Revision: 905913

URL: http://svn.apache.org/viewvc?rev=905913&view=rev
Log:
AVRO-386. Python implementation of compression (philz)

Added:
    hadoop/avro/trunk/lang/py/src/avro/tool.py
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/py/src/avro/datafile.py
    hadoop/avro/trunk/lang/py/test/test_datafile.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=905913&r1=905912&r2=905913&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Feb  3 06:58:46 2010
@@ -279,6 +279,8 @@
 
     AVRO-388. Using ResolvingDecoder in GenericDatumReader (thiru)
 
+    AVRO-386. Python implementaiton of compression (philz)
+
   OPTIMIZATIONS
 
     AVRO-172. More efficient schema processing (massie)

Modified: hadoop/avro/trunk/lang/py/src/avro/datafile.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/datafile.py?rev=905913&r1=905912&r2=905913&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/datafile.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/datafile.py Wed Feb  3 06:58:46 2010
@@ -16,6 +16,7 @@
 """
 Read/Write Avro File Object Containers.
 """
+import zlib
 import uuid
 import cStringIO
 from avro import schema
@@ -37,9 +38,12 @@
    {"name": "meta", "type": {"type": "map", "values": "bytes"}},
    {"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
 """ % (MAGIC_SIZE, SYNC_SIZE))
-VALID_CODECS = ['null']
+VALID_CODECS = ['null', 'deflate']
 VALID_ENCODINGS = ['binary'] # not used yet
 
+CODEC_KEY = "avro.codec"
+SCHEMA_KEY = "avro.schema"
+
 #
 # Exceptions
 #
@@ -61,9 +65,11 @@
     return uuid.uuid4().bytes
 
   # TODO(hammer): make 'encoder' a metadata property
-  def __init__(self, writer, datum_writer, writers_schema=None):
+  def __init__(self, writer, datum_writer, writers_schema=None, codec='null'):
     """
     If the schema is not present, presume we're appending.
+
+    @param writer: File-like object to write into.
     """
     self._writer = writer
     self._encoder = io.BinaryEncoder(writer)
@@ -74,9 +80,11 @@
     self._meta = {}
 
     if writers_schema is not None:
+      if codec not in VALID_CODECS:
+        raise DataFileException("Unknown codec: %r" % codec)
       self._sync_marker = DataFileWriter.generate_sync_marker()
-      self.set_meta('codec', 'null')
-      self.set_meta('schema', str(writers_schema))
+      self.set_meta('avro.codec', codec)
+      self.set_meta('avro.schema', str(writers_schema))
       self.datum_writer.writers_schema = writers_schema
       self._write_header()
     else:
@@ -86,11 +94,11 @@
       # TODO(hammer): collect arbitrary metadata
       # collect metadata
       self._sync_marker = dfr.sync_marker
-      self.set_meta('codec', dfr.get_meta('codec'))
+      self.set_meta('avro.codec', dfr.get_meta('avro.codec'))
 
       # get schema used to write existing file
-      schema_from_file = dfr.get_meta('schema')
-      self.set_meta('schema', schema_from_file)
+      schema_from_file = dfr.get_meta('avro.schema')
+      self.set_meta('avro.schema', schema_from_file)
       self.datum_writer.writers_schema = schema.parse(schema_from_file)
 
       # seek to the end of the file and prepare for writing
@@ -123,19 +131,28 @@
     self.datum_writer.write_data(META_SCHEMA, header, self.encoder)
 
   # TODO(hammer): make a schema for blocks and use datum_writer
-  # TODO(hammer): use codec when writing the block contents
   def _write_block(self):
     if self.block_count > 0:
       # write number of items in block
       self.encoder.write_long(self.block_count)
 
       # write block contents
-      if self.get_meta('codec') == 'null':
-        self.writer.write(self.buffer_writer.getvalue())
+      uncompressed_data = self.buffer_writer.getvalue()
+      if self.get_meta(CODEC_KEY) == 'null':
+        compressed_data = uncompressed_data
+      elif self.get_meta(CODEC_KEY) == 'deflate':
+        # The first two characters and last character are zlib
+        # wrappers around deflate data.
+        compressed_data = zlib.compress(uncompressed_data)[2:-1]
       else:
-        fail_msg = '"%s" codec is not supported.' % self.get_meta('codec')
+        fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
         raise DataFileException(fail_msg)
 
+      # Write length of block
+      self.encoder.write_long(len(compressed_data))
+      # Write block
+      self.writer.write(compressed_data)
+
       # write sync marker
       self.writer.write(self.sync_marker)
 
@@ -177,30 +194,34 @@
   # TODO(hammer): allow user to specify the encoder
   def __init__(self, reader, datum_reader):
     self._reader = reader
-    self._decoder = io.BinaryDecoder(reader)
+    self._raw_decoder = io.BinaryDecoder(reader)
+    self._datum_decoder = None # Maybe reset at every block.
     self._datum_reader = datum_reader
     
     # read the header: magic, meta, sync
     self._read_header()
 
     # ensure codec is valid
-    codec_from_file = self.get_meta('codec')
-    if codec_from_file is not None and codec_from_file not in VALID_CODECS:
-      raise DataFileException('Unknown codec: %s.' % codec_from_file)
+    self.codec = self.get_meta('avro.codec')
+    if self.codec is None:
+      self.codec = "null"
+    if self.codec not in VALID_CODECS:
+      raise DataFileException('Unknown codec: %s.' % self.codec)
 
     # get file length
     self._file_length = self.determine_file_length()
 
     # get ready to read
     self._block_count = 0
-    self.datum_reader.writers_schema = schema.parse(self.get_meta('schema'))
+    self.datum_reader.writers_schema = schema.parse(self.get_meta(SCHEMA_KEY))
   
   def __iter__(self):
     return self
 
   # read-only properties
   reader = property(lambda self: self._reader)
-  decoder = property(lambda self: self._decoder)
+  raw_decoder = property(lambda self: self._raw_decoder)
+  datum_decoder = property(lambda self: self._datum_decoder)
   datum_reader = property(lambda self: self._datum_reader)
   sync_marker = property(lambda self: self._sync_marker)
   meta = property(lambda self: self._meta)
@@ -235,7 +256,8 @@
     self.reader.seek(0, 0) 
 
     # read header into a dict
-    header = self.datum_reader.read_data(META_SCHEMA, META_SCHEMA, 
self.decoder)
+    header = self.datum_reader.read_data(
+      META_SCHEMA, META_SCHEMA, self.raw_decoder)
 
     # check magic number
     if header.get('magic') != MAGIC:
@@ -250,7 +272,19 @@
     self._sync_marker = header['sync']
 
   def _read_block_header(self):
-    self.block_count = self.decoder.read_long()
+    self.block_count = self.raw_decoder.read_long()
+    if self.codec == "null":
+      # Skip a long; we don't need to use the length.
+      self.raw_decoder.skip_long()
+      self._datum_decoder = self._raw_decoder
+    else:
+      # Compressed data is stored as (length, data), which
+      # corresponds to have bytes is stored.
+      data = self.raw_decoder.read_bytes()
+      # -15 is the log of the window size; negative indicates
+      # "raw" (no zlib headers) decompression.  See zlib.h.
+      uncompressed = zlib.decompress(data, -15)
+      self._datum_decoder = io.BinaryDecoder(cStringIO.StringIO(uncompressed))
 
   def _skip_sync(self):
     """
@@ -277,7 +311,7 @@
       else:
         self._read_block_header()
 
-    datum = self.datum_reader.read(self.decoder) 
+    datum = self.datum_reader.read(self.datum_decoder) 
     self.block_count -= 1
     return datum
 

Added: hadoop/avro/trunk/lang/py/src/avro/tool.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/tool.py?rev=905913&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/tool.py (added)
+++ hadoop/avro/trunk/lang/py/src/avro/tool.py Wed Feb  3 06:58:46 2010
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Command-line tool for manipulating Avro data files.
+
+NOTE: The API for the command-line tool is experimental.
+"""
+
+import sys
+from avro import datafile, io
+
+def main(args=sys.argv):
+  if len(args) == 1:
+    print "Usage: %s (dump)" % args[0]
+    return 1
+
+  if args[1] == "dump":
+    if len(args) != 3:
+      print "Usage: %s dump input_file" % args[0]
+      return 1
+    for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
+      print repr(d)
+  return 0
+  
+def file_or_stdin(f):
+  if f == "-":
+    return sys.stdin
+  else:
+    return file(f)
+
+if __name__ == "__main__":
+  sys.exit(main(sys.argv))

Modified: hadoop/avro/trunk/lang/py/test/test_datafile.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/test_datafile.py?rev=905913&r1=905912&r2=905913&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/test/test_datafile.py (original)
+++ hadoop/avro/trunk/lang/py/test/test_datafile.py Wed Feb  3 06:58:46 2010
@@ -51,6 +51,7 @@
 )
 
 FILENAME = 'test_datafile.out'
+CODECS_TO_VALIDATE = ('null', 'deflate')
 
 # TODO(hammer): clean up written files with ant, not os.remove
 class TestDataFile(unittest.TestCase):
@@ -61,38 +62,40 @@
     print ''
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
-      print ''
-      print 'SCHEMA NUMBER %d' % (i + 1)
-      print '================'
-      print ''
-      print 'Schema: %s' % example_schema
-      print 'Datum: %s' % datum
-
-      # write data in binary to file 10 times
-      writer = open(FILENAME, 'wb')
-      datum_writer = io.DatumWriter()
-      schema_object = schema.parse(example_schema)
-      dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
-      for i in range(10):
-        dfw.append(datum)
-      dfw.close()
+      for codec in CODECS_TO_VALIDATE:
+        print ''
+        print 'SCHEMA NUMBER %d' % (i + 1)
+        print '================'
+        print ''
+        print 'Schema: %s' % example_schema
+        print 'Datum: %s' % datum
+        print 'Codec: %s' % codec
+
+        # write data in binary to file 10 times
+        writer = open(FILENAME, 'wb')
+        datum_writer = io.DatumWriter()
+        schema_object = schema.parse(example_schema)
+        dfw = datafile.DataFileWriter(writer, datum_writer, schema_object, 
codec=codec)
+        for i in range(10):
+          dfw.append(datum)
+        dfw.close()
 
-      # read data in binary from file
-      reader = open(FILENAME, 'rb')
-      datum_reader = io.DatumReader()
-      dfr = datafile.DataFileReader(reader, datum_reader)
-      round_trip_data = []
-      for datum in dfr:
-        round_trip_data.append(datum)
-
-      print 'Round Trip Data: %s' % round_trip_data
-      print 'Round Trip Data Length: %d' % len(round_trip_data)
-      is_correct = [datum] * 10 == round_trip_data
-      if is_correct: correct += 1
-      print 'Correct Round Trip: %s' % is_correct
-      print ''
+        # read data in binary from file
+        reader = open(FILENAME, 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        round_trip_data = []
+        for datum in dfr:
+          round_trip_data.append(datum)
+
+        print 'Round Trip Data: %s' % round_trip_data
+        print 'Round Trip Data Length: %d' % len(round_trip_data)
+        is_correct = [datum] * 10 == round_trip_data
+        if is_correct: correct += 1
+        print 'Correct Round Trip: %s' % is_correct
+        print ''
     os.remove(FILENAME)
-    self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
+    self.assertEquals(correct, 
len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
 
   def test_append(self):
     print ''
@@ -101,44 +104,46 @@
     print ''
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
-      print ''
-      print 'SCHEMA NUMBER %d' % (i + 1)
-      print '================'
-      print ''
-      print 'Schema: %s' % example_schema
-      print 'Datum: %s' % datum
-
-      # write data in binary to file once
-      writer = open(FILENAME, 'wb')
-      datum_writer = io.DatumWriter()
-      schema_object = schema.parse(example_schema)
-      dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
-      dfw.append(datum)
-      dfw.close()
-
-      # open file, write, and close nine times
-      for i in range(9):
-        writer = open(FILENAME, 'ab+')
-        dfw = datafile.DataFileWriter(writer, io.DatumWriter())
+      for codec in CODECS_TO_VALIDATE:
+        print ''
+        print 'SCHEMA NUMBER %d' % (i + 1)
+        print '================'
+        print ''
+        print 'Schema: %s' % example_schema
+        print 'Datum: %s' % datum
+        print 'Codec: %s' % codec
+
+        # write data in binary to file once
+        writer = open(FILENAME, 'wb')
+        datum_writer = io.DatumWriter()
+        schema_object = schema.parse(example_schema)
+        dfw = datafile.DataFileWriter(writer, datum_writer, schema_object, 
codec=codec)
         dfw.append(datum)
         dfw.close()
 
-      # read data in binary from file
-      reader = open(FILENAME, 'rb')
-      datum_reader = io.DatumReader()
-      dfr = datafile.DataFileReader(reader, datum_reader)
-      appended_data = []
-      for datum in dfr:
-        appended_data.append(datum)
-
-      print 'Appended Data: %s' % appended_data
-      print 'Appended Data Length: %d' % len(appended_data)
-      is_correct = [datum] * 10 == appended_data
-      if is_correct: correct += 1
-      print 'Correct Appended: %s' % is_correct
-      print ''
+        # open file, write, and close nine times
+        for i in range(9):
+          writer = open(FILENAME, 'ab+')
+          dfw = datafile.DataFileWriter(writer, io.DatumWriter())
+          dfw.append(datum)
+          dfw.close()
+
+        # read data in binary from file
+        reader = open(FILENAME, 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        appended_data = []
+        for datum in dfr:
+          appended_data.append(datum)
+
+        print 'Appended Data: %s' % appended_data
+        print 'Appended Data Length: %d' % len(appended_data)
+        is_correct = [datum] * 10 == appended_data
+        if is_correct: correct += 1
+        print 'Correct Appended: %s' % is_correct
+        print ''
     os.remove(FILENAME)
-    self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
+    self.assertEquals(correct, 
len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
 
 if __name__ == '__main__':
   unittest.main()


Reply via email to