Author: cutting
Date: Wed Oct 7 18:36:33 2009
New Revision: 822827
URL: http://svn.apache.org/viewvc?rev=822827&view=rev
Log:
AVRO-134. Update data file format specification to include reserved metadata
keys 'codec' and 'sync'. Contributed by Thiruvalluvan M. G.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/src/py/avro/io.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=822827&r1=822826&r2=822827&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Oct 7 18:36:33 2009
@@ -58,6 +58,10 @@
AVRO-131. Permit specification of JUnit test output format.
(Giridharan Kesavan via cutting)
+ AVRO-134. Update data file format specification to include
+ reserved metadata keys "codec" and "sync". The only codec
+ currently defined is "null". (Thiruvalluvan M. G. via cutting)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=822827&r1=822826&r2=822827&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Wed Oct 7 18:36:33 2009
@@ -593,7 +593,12 @@
<li><strong>count</strong> contains the number of objects in
the file as a decimal ASCII string.</li>
<li><strong>codec</strong> the name of the compression codec
- used to compress blocks.</li>
+ used to compress blocks, as a string. The only value for codec
+ currently supported is "null" (meaning no compression is
+ performed). If codec is absent, it is assumed to be
+ "null".</li>
+ <li><strong>sync</strong> the 16-byte sync marker used in this file,
+ as a byte sequence.</li>
</ul>
<p>A normal block consists of:</p>
Added: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java?rev=822827&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
(added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java Wed
Oct 7 18:36:33 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.file;
+
+/**
+ * Constants used in data files.
+ */
+class DataFileConstants {
+ public static final byte VERSION = 0;
+ public static final byte[] MAGIC = new byte[] {
+ (byte)'O', (byte)'b', (byte)'j', VERSION
+ };
+ public static final long FOOTER_BLOCK = -1;
+ public static final int SYNC_SIZE = 16;
+ public static final int SYNC_INTERVAL = 1000*SYNC_SIZE;
+
+ public static final String SCHEMA = "schema";
+ public static final String SYNC = "sync";
+ public static final String COUNT = "count";
+ public static final String CODEC = "codec";
+ public static final String NULL_CODEC = "null";
+}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java?rev=822827&r1=822826&r2=822827&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java Wed Oct
7 18:36:33 2009
@@ -45,8 +45,8 @@
private long count; // # entries in file
private long blockCount; // # entries in block
- private byte[] sync = new byte[DataFileWriter.SYNC_SIZE];
- private byte[] syncBuffer = new byte[DataFileWriter.SYNC_SIZE];
+ private byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
+ private byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
/** Construct a reader for a file. */
public DataFileReader(SeekableInput sin, DatumReader<D> reader)
@@ -55,7 +55,7 @@
byte[] magic = new byte[4];
in.read(magic);
- if (!Arrays.equals(DataFileWriter.MAGIC, magic))
+ if (!Arrays.equals(DataFileConstants.MAGIC, magic))
throw new IOException("Not a data file.");
long length = in.length();
@@ -76,14 +76,18 @@
} while ((l = vin.mapNext()) != 0);
}
- this.sync = getMeta("sync");
- this.count = getMetaLong("count");
- this.schema = Schema.parse(getMetaString("schema"));
+ this.sync = getMeta(DataFileConstants.SYNC);
+ this.count = getMetaLong(DataFileConstants.COUNT);
+ String codec = getMetaString(DataFileConstants.CODEC);
+ if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
+ throw new IOException("Unknown codec: " + codec);
+ }
+ this.schema = Schema.parse(getMetaString(DataFileConstants.SCHEMA));
this.reader = reader;
reader.setSchema(schema);
- in.seek(DataFileWriter.MAGIC.length); // seek to start
+ in.seek(DataFileConstants.MAGIC.length); // seek to start
}
/** Return the value of a metadata property. */
@@ -92,8 +96,12 @@
}
/** Return the value of a metadata property. */
public synchronized String getMetaString(String key) {
+ byte[] value = getMeta(key);
+ if (value == null) {
+ return null;
+ }
try {
- return new String(getMeta(key), "UTF-8");
+ return new String(value, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
@@ -114,7 +122,7 @@
blockCount = vin.readLong(); // read blockCount
- if (blockCount == DataFileWriter.FOOTER_BLOCK) {
+ if (blockCount == DataFileConstants.FOOTER_BLOCK) {
in.seek(vin.readLong()+in.tell()); // skip a footer
blockCount = 0;
}
@@ -138,7 +146,7 @@
/** Move to the next synchronization point after a position. */
public synchronized void sync(long position) throws IOException {
- if (in.tell()+DataFileWriter.SYNC_SIZE >= in.length()) {
+ if (in.tell()+DataFileConstants.SYNC_SIZE >= in.length()) {
in.seek(in.length());
return;
}
@@ -151,7 +159,7 @@
break;
}
if (j == sync.length) { // position before sync
- in.seek(in.tell() - DataFileWriter.SYNC_SIZE);
+ in.seek(in.tell() - DataFileConstants.SYNC_SIZE);
return;
}
syncBuffer[i%sync.length] = (byte)in.read();
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=822827&r1=822826&r2=822827&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Wed Oct
7 18:36:33 2009
@@ -44,14 +44,6 @@
* @see DataFileReader
*/
public class DataFileWriter<D> {
- static final byte VERSION = 0;
- static final byte[] MAGIC = new byte[] {
- (byte)'O', (byte)'b', (byte)'j', VERSION
- };
- static final long FOOTER_BLOCK = -1;
- static final int SYNC_SIZE = 16;
- static final int SYNC_INTERVAL = 1000*SYNC_SIZE;
-
private Schema schema;
private DatumWriter<D> dout;
@@ -64,7 +56,7 @@
private int blockCount; // # entries in current block
private ByteArrayOutputStream buffer =
- new ByteArrayOutputStream(SYNC_INTERVAL*2);
+ new ByteArrayOutputStream(DataFileConstants.SYNC_INTERVAL*2);
private Encoder bufOut = new BinaryEncoder(buffer);
private byte[] sync; // 16 random bytes
@@ -89,10 +81,11 @@
dout.setSchema(schema);
- setMeta("sync", sync);
- setMeta("schema", schema.toString());
+ setMeta(DataFileConstants.SYNC, sync);
+ setMeta(DataFileConstants.SCHEMA, schema.toString());
+ setMeta(DataFileConstants.CODEC, DataFileConstants.NULL_CODEC);
- out.write(MAGIC);
+ out.write(DataFileConstants.MAGIC);
}
/** Set a metadata property. */
@@ -120,7 +113,7 @@
types.add(branch);
this.schema = Schema.createUnion(types);
this.dout.setSchema(schema);
- setMeta("schema", schema.toString());
+ setMeta(DataFileConstants.SCHEMA, schema.toString());
}
/** Append a datum to the file. */
@@ -128,7 +121,7 @@
dout.write(datum, bufOut);
blockCount++;
count++;
- if (buffer.size() >= SYNC_INTERVAL)
+ if (buffer.size() >= DataFileConstants.SYNC_INTERVAL)
writeBlock();
}
@@ -164,7 +157,7 @@
private void writeFooter() throws IOException {
writeBlock(); // flush any data
- setMeta("count", count); // update count
+ setMeta(DataFileConstants.COUNT, count); // update count
bufOut.writeMapStart(); // write meta entries
bufOut.setItemCount(meta.size());
for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
@@ -176,7 +169,7 @@
int size = buffer.size()+4;
out.write(sync);
- vout.writeLong(FOOTER_BLOCK); // tag the block
+ vout.writeLong(DataFileConstants.FOOTER_BLOCK); // tag the
block
vout.writeLong(size);
buffer.writeTo(out);
buffer.reset();
Modified: hadoop/avro/trunk/src/py/avro/io.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=822827&r1=822826&r2=822827&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Wed Oct 7 18:36:33 2009
@@ -223,6 +223,7 @@
self.__meta = dict()
self.__sync = uuid.uuid4().bytes
self.__meta["sync"] = self.__sync
+ self.__meta["codec"] = "null"
self.__meta["schema"] = schema.stringval(schm)
self.__writer.write(struct.pack(len(_MAGIC).__str__()+'s',
_MAGIC))
@@ -313,6 +314,9 @@
self.__meta[key] = self.__decoder.readbytes()
self.__sync = self.__meta.get("sync")
self.__count = int(self.__meta.get("count"))
+ self.__codec = self.__meta.get("codec")
+ if (self.__codec != None) and (self.__codec != "null"):
+ raise schema.AvroException("Unknown codec: " + self.__codec)
self.__schema = schema.parse(self.__meta.get("schema").encode("utf-8"))
self.__blockcount = 0
self.__dreader = dreader