http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java deleted file mode 100644 index b13f499..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java +++ /dev/null @@ -1,3726 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io; - -import java.io.*; -import java.util.*; -import java.rmi.server.UID; -import java.security.MessageDigest; -import org.apache.commons.logging.*; -import org.apache.hadoop.util.Options; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.Options.CreateOpts; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.zlib.ZlibFactory; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Progress; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.NativeCodeLoader; -import org.apache.hadoop.util.MergeSort; -import org.apache.hadoop.util.PriorityQueue; -import org.apache.hadoop.util.Time; -// ** Pivotal Changes Begin -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.UTF8; -import org.apache.hadoop.io.VersionMismatchException; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableName; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; -//** Pivotal Changes End - -/** - * <code>SequenceFile</code>s are flat files consisting of binary key/value - * pairs. - * - * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and - * {@link Sorter} classes for writing, reading and sorting respectively.</p> - * - * There are three <code>SequenceFile</code> <code>Writer</code>s based on the - * {@link CompressionType} used to compress key/value pairs: - * <ol> - * <li> - * <code>Writer</code> : Uncompressed records. - * </li> - * <li> - * <code>RecordCompressWriter</code> : Record-compressed files, only compress - * values. - * </li> - * <li> - * <code>BlockCompressWriter</code> : Block-compressed files, both keys & - * values are collected in 'blocks' - * separately and compressed. The size of - * the 'block' is configurable. - * </ol> - * - * <p>The actual compression algorithm used to compress key and/or values can be - * specified by using the appropriate {@link CompressionCodec}.</p> - * - * <p>The recommended way is to use the static <tt>createWriter</tt> methods - * provided by the <code>SequenceFile</code> to chose the preferred format.</p> - * - * <p>The {@link Reader} acts as the bridge and can read any of the above - * <code>SequenceFile</code> formats.</p> - * - * <h4 id="Formats">SequenceFile Formats</h4> - * - * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s - * depending on the <code>CompressionType</code> specified. All of them share a - * <a href="#Header">common header</a> described below. - * - * <h5 id="Header">SequenceFile Header</h5> - * <ul> - * <li> - * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual - * version number (e.g. SEQ4 or SEQ6) - * </li> - * <li> - * keyClassName -key class - * </li> - * <li> - * valueClassName - value class - * </li> - * <li> - * compression - A boolean which specifies if compression is turned on for - * keys/values in this file. - * </li> - * <li> - * blockCompression - A boolean which specifies if block-compression is - * turned on for keys/values in this file. - * </li> - * <li> - * compression codec - <code>CompressionCodec</code> class which is used for - * compression of keys and/or values (if compression is - * enabled). - * </li> - * <li> - * metadata - {@link Metadata} for this file. - * </li> - * <li> - * sync - A sync marker to denote end of the header. - * </li> - * </ul> - * - * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> - * <ul> - * <li> - * <a href="#Header">Header</a> - * </li> - * <li> - * Record - * <ul> - * <li>Record length</li> - * <li>Key length</li> - * <li>Key</li> - * <li>Value</li> - * </ul> - * </li> - * <li> - * A sync-marker every few <code>100</code> bytes or so. - * </li> - * </ul> - * - * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> - * <ul> - * <li> - * <a href="#Header">Header</a> - * </li> - * <li> - * Record - * <ul> - * <li>Record length</li> - * <li>Key length</li> - * <li>Key</li> - * <li><i>Compressed</i> Value</li> - * </ul> - * </li> - * <li> - * A sync-marker every few <code>100</code> bytes or so. - * </li> - * </ul> - * - * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> - * <ul> - * <li> - * <a href="#Header">Header</a> - * </li> - * <li> - * Record <i>Block</i> - * <ul> - * <li>Uncompressed number of records in the block</li> - * <li>Compressed key-lengths block-size</li> - * <li>Compressed key-lengths block</li> - * <li>Compressed keys block-size</li> - * <li>Compressed keys block</li> - * <li>Compressed value-lengths block-size</li> - * <li>Compressed value-lengths block</li> - * <li>Compressed values block-size</li> - * <li>Compressed values block</li> - * </ul> - * </li> - * <li> - * A sync-marker every block. - * </li> - * </ul> - * - * <p>The compressed blocks of key lengths and value lengths consist of the - * actual lengths of individual keys/values encoded in ZeroCompressedInteger - * format.</p> - * - * @see CompressionCodec - */ -@InterfaceAudience.Public -@InterfaceStability.Stable -public class SequenceFile { - private static final Log LOG = LogFactory.getLog(SequenceFile.class); - - private SequenceFile() {} // no public ctor - - private static final byte BLOCK_COMPRESS_VERSION = (byte)4; - private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; - private static final byte VERSION_WITH_METADATA = (byte)6; - private static byte[] VERSION = new byte[] { - (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA - }; - - private static final int SYNC_ESCAPE = -1; // "length" of sync entries - private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash - private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash - - /** The number of bytes between sync points.*/ - public static final int SYNC_INTERVAL = 100*SYNC_SIZE; - - /** - * The compression type used to compress key/value pairs in the - * {@link SequenceFile}. - * - * @see SequenceFile.Writer - */ - public static enum CompressionType { - /** Do not compress records. */ - NONE, - /** Compress values only, each separately. */ - RECORD, - /** Compress sequences of records together in blocks. */ - BLOCK - } - - /** - * Get the compression type for the reduce outputs - * @param job the job config to look in - * @return the kind of compression to use - */ - static public CompressionType getDefaultCompressionType(Configuration job) { - String name = job.get("io.seqfile.compression.type"); - return name == null ? CompressionType.RECORD : - CompressionType.valueOf(name); - } - - /** - * Set the default compression type for sequence files. - * @param job the configuration to modify - * @param val the new compression type (none, block, record) - */ - static public void setDefaultCompressionType(Configuration job, - CompressionType val) { - job.set("io.seqfile.compression.type", val.toString()); - } - - /** - * Create a new Writer with the given options. - * @param conf the configuration to use - * @param opts the options to create the file with - * @return a new Writer - * @throws IOException - */ - public static Writer createWriter(Configuration conf, Writer.Option... opts - ) throws IOException { - Writer.CompressionOption compressionOption = - Options.getOption(Writer.CompressionOption.class, opts); - CompressionType kind; - if (compressionOption != null) { - kind = compressionOption.getValue(); - } else { - kind = getDefaultCompressionType(conf); - opts = Options.prependOptions(opts, Writer.compression(kind)); - } - switch (kind) { - default: - case NONE: - return new Writer(conf, opts); - case RECORD: - return new RecordCompressWriter(conf, opts); - case BLOCK: - return new BlockCompressWriter(conf, opts); - } - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass) throws IOException { - return createWriter(conf, Writer.filesystem(fs), - Writer.file(name), Writer.keyClass(keyClass), - Writer.valueClass(valClass)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType) throws IOException { - return createWriter(conf, Writer.filesystem(fs), - Writer.file(name), Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param progress The Progressable object to track progress. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionType compressionType, - Progressable progress) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType), - Writer.progressable(progress)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param progress The Progressable object to track progress. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec, - Progressable progress, Metadata metadata) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.progressable(progress), - Writer.metadata(metadata)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param bufferSize buffer size for the underlaying outputstream. - * @param replication replication factor for the file. - * @param blockSize block size for the file. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param progress The Progressable object to track progress. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, int bufferSize, - short replication, long blockSize, - CompressionType compressionType, CompressionCodec codec, - Progressable progress, Metadata metadata) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.bufferSize(bufferSize), - Writer.replication(replication), - Writer.blockSize(blockSize), - Writer.compression(compressionType, codec), - Writer.progressable(progress), - Writer.metadata(metadata)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param bufferSize buffer size for the underlaying outputstream. - * @param replication replication factor for the file. - * @param blockSize block size for the file. - * @param createParent create parent directory if non-existent - * @param compressionType The compression type. - * @param codec The compression codec. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, int bufferSize, - short replication, long blockSize, boolean createParent, - CompressionType compressionType, CompressionCodec codec, - Metadata metadata) throws IOException { - return createWriter(FileContext.getFileContext(fs.getUri(), conf), - conf, name, keyClass, valClass, compressionType, codec, - metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), - CreateOpts.bufferSize(bufferSize), - createParent ? CreateOpts.createParent() - : CreateOpts.donotCreateParent(), - CreateOpts.repFac(replication), - CreateOpts.blockSize(blockSize) - ); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fc The context for the specified file. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param metadata The metadata of the file. - * @param createFlag gives the semantics of create: overwrite, append etc. - * @param opts file creation options; see {@link CreateOpts}. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - */ - public static Writer - createWriter(FileContext fc, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec, - Metadata metadata, - final EnumSet<CreateFlag> createFlag, CreateOpts... opts) - throws IOException { - return createWriter(conf, fc.create(name, createFlag, opts), - keyClass, valClass, compressionType, codec, metadata).ownStream(); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param progress The Progressable object to track progress. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec, - Progressable progress) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.progressable(progress)); - } - - /** - * Construct the preferred type of 'raw' SequenceFile Writer. - * @param conf The configuration. - * @param out The stream on top which the writer is to be constructed. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, - CompressionType compressionType, - CompressionCodec codec, Metadata metadata) throws IOException { - return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.metadata(metadata)); - } - - /** - * Construct the preferred type of 'raw' SequenceFile Writer. - * @param conf The configuration. - * @param out The stream on top which the writer is to be constructed. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec) throws IOException { - return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec)); - } - - - /** The interface to 'raw' values of SequenceFiles. */ - public static interface ValueBytes { - - /** Writes the uncompressed bytes to the outStream. - * @param outStream : Stream to write uncompressed bytes into. - * @throws IOException - */ - public void writeUncompressedBytes(DataOutputStream outStream) - throws IOException; - - /** Write compressed bytes to outStream. - * Note: that it will NOT compress the bytes if they are not compressed. - * @param outStream : Stream to write compressed bytes into. - */ - public void writeCompressedBytes(DataOutputStream outStream) - throws IllegalArgumentException, IOException; - - /** - * Size of stored data. - */ - public int getSize(); - } - - private static class UncompressedBytes implements ValueBytes { - private int dataSize; - private byte[] data; - - private UncompressedBytes() { - data = null; - dataSize = 0; - } - - private void reset(DataInputStream in, int length) throws IOException { - if (data == null) { - data = new byte[length]; - } else if (length > data.length) { - data = new byte[Math.max(length, data.length * 2)]; - } - dataSize = -1; - in.readFully(data, 0, length); - dataSize = length; - } - - @Override - public int getSize() { - return dataSize; - } - - @Override - public void writeUncompressedBytes(DataOutputStream outStream) - throws IOException { - outStream.write(data, 0, dataSize); - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) - throws IllegalArgumentException, IOException { - throw - new IllegalArgumentException("UncompressedBytes cannot be compressed!"); - } - - } // UncompressedBytes - - private static class CompressedBytes implements ValueBytes { - private int dataSize; - private byte[] data; - DataInputBuffer rawData = null; - CompressionCodec codec = null; - CompressionInputStream decompressedStream = null; - - private CompressedBytes(CompressionCodec codec) { - data = null; - dataSize = 0; - this.codec = codec; - } - - private void reset(DataInputStream in, int length) throws IOException { - if (data == null) { - data = new byte[length]; - } else if (length > data.length) { - data = new byte[Math.max(length, data.length * 2)]; - } - dataSize = -1; - in.readFully(data, 0, length); - dataSize = length; - } - - @Override - public int getSize() { - return dataSize; - } - - @Override - public void writeUncompressedBytes(DataOutputStream outStream) - throws IOException { - if (decompressedStream == null) { - rawData = new DataInputBuffer(); - decompressedStream = codec.createInputStream(rawData); - } else { - decompressedStream.resetState(); - } - rawData.reset(data, 0, dataSize); - - byte[] buffer = new byte[8192]; - int bytesRead = 0; - while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { - outStream.write(buffer, 0, bytesRead); - } - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) - throws IllegalArgumentException, IOException { - outStream.write(data, 0, dataSize); - } - - } // CompressedBytes - - /** - * The class encapsulating with the metadata of a file. - * The metadata of a file is a list of attribute name/value - * pairs of Text type. - * - */ - public static class Metadata implements Writable { - - private TreeMap<Text, Text> theMetadata; - - public Metadata() { - this(new TreeMap<Text, Text>()); - } - - public Metadata(TreeMap<Text, Text> arg) { - if (arg == null) { - this.theMetadata = new TreeMap<Text, Text>(); - } else { - this.theMetadata = arg; - } - } - - public Text get(Text name) { - return this.theMetadata.get(name); - } - - public void set(Text name, Text value) { - this.theMetadata.put(name, value); - } - - public TreeMap<Text, Text> getMetadata() { - return new TreeMap<Text, Text>(this.theMetadata); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(this.theMetadata.size()); - Iterator<Map.Entry<Text, Text>> iter = - this.theMetadata.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<Text, Text> en = iter.next(); - en.getKey().write(out); - en.getValue().write(out); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - int sz = in.readInt(); - if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); - this.theMetadata = new TreeMap<Text, Text>(); - for (int i = 0; i < sz; i++) { - Text key = new Text(); - Text val = new Text(); - key.readFields(in); - val.readFields(in); - this.theMetadata.put(key, val); - } - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - if (other.getClass() != this.getClass()) { - return false; - } else { - return equals((Metadata)other); - } - } - - public boolean equals(Metadata other) { - if (other == null) return false; - if (this.theMetadata.size() != other.theMetadata.size()) { - return false; - } - Iterator<Map.Entry<Text, Text>> iter1 = - this.theMetadata.entrySet().iterator(); - Iterator<Map.Entry<Text, Text>> iter2 = - other.theMetadata.entrySet().iterator(); - while (iter1.hasNext() && iter2.hasNext()) { - Map.Entry<Text, Text> en1 = iter1.next(); - Map.Entry<Text, Text> en2 = iter2.next(); - if (!en1.getKey().equals(en2.getKey())) { - return false; - } - if (!en1.getValue().equals(en2.getValue())) { - return false; - } - } - if (iter1.hasNext() || iter2.hasNext()) { - return false; - } - return true; - } - - @Override - public int hashCode() { - assert false : "hashCode not designed"; - return 42; // any arbitrary constant will do - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("size: ").append(this.theMetadata.size()).append("\n"); - Iterator<Map.Entry<Text, Text>> iter = - this.theMetadata.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<Text, Text> en = iter.next(); - sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); - sb.append("\n"); - } - return sb.toString(); - } - } - - /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable, Syncable { - private Configuration conf; - FSDataOutputStream out; - boolean ownOutputStream = true; - DataOutputBuffer buffer = new DataOutputBuffer(); - - Class keyClass; - Class valClass; - - private final CompressionType compress; - CompressionCodec codec = null; - CompressionOutputStream deflateFilter = null; - DataOutputStream deflateOut = null; - Metadata metadata = null; - Compressor compressor = null; - - protected Serializer keySerializer; - protected Serializer uncompressedValSerializer; - protected Serializer compressedValSerializer; - - // Insert a globally unique 16-byte value every few entries, so that one - // can seek into the middle of a file and then synchronize with record - // starts and ends by scanning for this value. - long lastSyncPos; // position of last sync - byte[] sync; // 16 random bytes - { - try { - MessageDigest digester = MessageDigest.getInstance("MD5"); - long time = Time.now(); - digester.update((new UID()+"@"+time).getBytes()); - sync = digester.digest(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static interface Option {} - - static class FileOption extends Options.PathOption - implements Option { - FileOption(Path path) { - super(path); - } - } - - /** - * @deprecated only used for backwards-compatibility in the createWriter methods - * that take FileSystem. - */ - @Deprecated - private static class FileSystemOption implements Option { - private final FileSystem value; - protected FileSystemOption(FileSystem value) { - this.value = value; - } - public FileSystem getValue() { - return value; - } - } - - static class StreamOption extends Options.FSDataOutputStreamOption - implements Option { - StreamOption(FSDataOutputStream stream) { - super(stream); - } - } - - static class BufferSizeOption extends Options.IntegerOption - implements Option { - BufferSizeOption(int value) { - super(value); - } - } - - static class BlockSizeOption extends Options.LongOption implements Option { - BlockSizeOption(long value) { - super(value); - } - } - - static class ReplicationOption extends Options.IntegerOption - implements Option { - ReplicationOption(int value) { - super(value); - } - } - - static class KeyClassOption extends Options.ClassOption implements Option { - KeyClassOption(Class<?> value) { - super(value); - } - } - - static class ValueClassOption extends Options.ClassOption - implements Option { - ValueClassOption(Class<?> value) { - super(value); - } - } - - static class MetadataOption implements Option { - private final Metadata value; - MetadataOption(Metadata value) { - this.value = value; - } - Metadata getValue() { - return value; - } - } - - static class ProgressableOption extends Options.ProgressableOption - implements Option { - ProgressableOption(Progressable value) { - super(value); - } - } - - private static class CompressionOption implements Option { - private final CompressionType value; - private final CompressionCodec codec; - CompressionOption(CompressionType value) { - this(value, null); - } - CompressionOption(CompressionType value, CompressionCodec codec) { - this.value = value; - this.codec = (CompressionType.NONE != value && null == codec) - ? new DefaultCodec() - : codec; - } - CompressionType getValue() { - return value; - } - CompressionCodec getCodec() { - return codec; - } - } - - public static Option file(Path value) { - return new FileOption(value); - } - - /** - * @deprecated only used for backwards-compatibility in the createWriter methods - * that take FileSystem. - */ - @Deprecated - private static Option filesystem(FileSystem fs) { - return new SequenceFile.Writer.FileSystemOption(fs); - } - - public static Option bufferSize(int value) { - return new BufferSizeOption(value); - } - - public static Option stream(FSDataOutputStream value) { - return new StreamOption(value); - } - - public static Option replication(short value) { - return new ReplicationOption(value); - } - - public static Option blockSize(long value) { - return new BlockSizeOption(value); - } - - public static Option progressable(Progressable value) { - return new ProgressableOption(value); - } - - public static Option keyClass(Class<?> value) { - return new KeyClassOption(value); - } - - public static Option valueClass(Class<?> value) { - return new ValueClassOption(value); - } - - public static Option metadata(Metadata value) { - return new MetadataOption(value); - } - - public static Option compression(CompressionType value) { - return new CompressionOption(value); - } - - public static Option compression(CompressionType value, - CompressionCodec codec) { - return new CompressionOption(value, codec); - } - - /** - * Construct a uncompressed writer from a set of options. - * @param conf the configuration to use - * @param opts the options used when creating the writer - * @throws IOException if it fails - */ - Writer(Configuration conf, - Option... opts) throws IOException { - BlockSizeOption blockSizeOption = - Options.getOption(BlockSizeOption.class, opts); - BufferSizeOption bufferSizeOption = - Options.getOption(BufferSizeOption.class, opts); - ReplicationOption replicationOption = - Options.getOption(ReplicationOption.class, opts); - ProgressableOption progressOption = - Options.getOption(ProgressableOption.class, opts); - FileOption fileOption = Options.getOption(FileOption.class, opts); - FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); - StreamOption streamOption = Options.getOption(StreamOption.class, opts); - KeyClassOption keyClassOption = - Options.getOption(KeyClassOption.class, opts); - ValueClassOption valueClassOption = - Options.getOption(ValueClassOption.class, opts); - MetadataOption metadataOption = - Options.getOption(MetadataOption.class, opts); - CompressionOption compressionTypeOption = - Options.getOption(CompressionOption.class, opts); - // check consistency of options - if ((fileOption == null) == (streamOption == null)) { - throw new IllegalArgumentException("file or stream must be specified"); - } - if (fileOption == null && (blockSizeOption != null || - bufferSizeOption != null || - replicationOption != null || - progressOption != null)) { - throw new IllegalArgumentException("file modifier options not " + - "compatible with stream"); - } - - FSDataOutputStream out; - boolean ownStream = fileOption != null; - if (ownStream) { - Path p = fileOption.getValue(); - FileSystem fs; - if (fsOption != null) { - fs = fsOption.getValue(); - } else { - fs = p.getFileSystem(conf); - } - int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : - bufferSizeOption.getValue(); - short replication = replicationOption == null ? - fs.getDefaultReplication(p) : - (short) replicationOption.getValue(); - long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : - blockSizeOption.getValue(); - Progressable progress = progressOption == null ? null : - progressOption.getValue(); - out = fs.create(p, true, bufferSize, replication, blockSize, progress); - } else { - out = streamOption.getValue(); - } - Class<?> keyClass = keyClassOption == null ? - Object.class : keyClassOption.getValue(); - Class<?> valueClass = valueClassOption == null ? - Object.class : valueClassOption.getValue(); - Metadata metadata = metadataOption == null ? - new Metadata() : metadataOption.getValue(); - this.compress = compressionTypeOption.getValue(); - final CompressionCodec codec = compressionTypeOption.getCodec(); - if (codec != null && - (codec instanceof GzipCodec) && - !NativeCodeLoader.isNativeCodeLoaded() && - !ZlibFactory.isNativeZlibLoaded(conf)) { - throw new IllegalArgumentException("SequenceFile doesn't work with " + - "GzipCodec without native-hadoop " + - "code!"); - } - init(conf, out, ownStream, keyClass, valueClass, codec, metadata); - } - - /** Create the named file. - * @deprecated Use - * {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass) throws IOException { - this.compress = CompressionType.NONE; - init(conf, fs.create(name), true, keyClass, valClass, null, - new Metadata()); - } - - /** Create the named file with write-progress reporter. - * @deprecated Use - * {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - Progressable progress, Metadata metadata) throws IOException { - this.compress = CompressionType.NONE; - init(conf, fs.create(name, progress), true, keyClass, valClass, - null, metadata); - } - - /** Create the named file with write-progress reporter. - * @deprecated Use - * {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} - * instead. - */ - @Deprecated - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - int bufferSize, short replication, long blockSize, - Progressable progress, Metadata metadata) throws IOException { - this.compress = CompressionType.NONE; - init(conf, - fs.create(name, true, bufferSize, replication, blockSize, progress), - true, keyClass, valClass, null, metadata); - } - - boolean isCompressed() { return compress != CompressionType.NONE; } - boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } - - Writer ownStream() { this.ownOutputStream = true; return this; } - - /** Write and flush the file header. */ - private void writeFileHeader() - throws IOException { - out.write(VERSION); - Text.writeString(out, keyClass.getName()); - Text.writeString(out, valClass.getName()); - - out.writeBoolean(this.isCompressed()); - out.writeBoolean(this.isBlockCompressed()); - - if (this.isCompressed()) { - Text.writeString(out, (codec.getClass()).getName()); - } - this.metadata.write(out); - out.write(sync); // write the sync bytes - out.flush(); // flush header - } - - /** Initialize. */ - @SuppressWarnings("unchecked") - void init(Configuration conf, FSDataOutputStream out, boolean ownStream, - Class keyClass, Class valClass, - CompressionCodec codec, Metadata metadata) - throws IOException { - this.conf = conf; - this.out = out; - this.ownOutputStream = ownStream; - this.keyClass = keyClass; - this.valClass = valClass; - this.codec = codec; - this.metadata = metadata; - SerializationFactory serializationFactory = new SerializationFactory(conf); - this.keySerializer = serializationFactory.getSerializer(keyClass); - if (this.keySerializer == null) { - throw new IOException( - "Could not find a serializer for the Key class: '" - + keyClass.getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using" - + "custom serialization."); - } - this.keySerializer.open(buffer); - this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); - if (this.uncompressedValSerializer == null) { - throw new IOException( - "Could not find a serializer for the Value class: '" - + valClass.getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using" - + "custom serialization."); - } - this.uncompressedValSerializer.open(buffer); - if (this.codec != null) { - ReflectionUtils.setConf(this.codec, this.conf); - this.compressor = CodecPool.getCompressor(this.codec); - this.deflateFilter = this.codec.createOutputStream(buffer, compressor); - this.deflateOut = - new DataOutputStream(new BufferedOutputStream(deflateFilter)); - this.compressedValSerializer = serializationFactory.getSerializer(valClass); - if (this.compressedValSerializer == null) { - throw new IOException( - "Could not find a serializer for the Value class: '" - + valClass.getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using" - + "custom serialization."); - } - this.compressedValSerializer.open(deflateOut); - } - writeFileHeader(); - } - - /** Returns the class of keys in this file. */ - public Class getKeyClass() { return keyClass; } - - /** Returns the class of values in this file. */ - public Class getValueClass() { return valClass; } - - /** Returns the compression codec of data in this file. */ - public CompressionCodec getCompressionCodec() { return codec; } - - /** create a sync point */ - public void sync() throws IOException { - if (sync != null && lastSyncPos != out.getPos()) { - out.writeInt(SYNC_ESCAPE); // mark the start of the sync - out.write(sync); // write sync - lastSyncPos = out.getPos(); // update lastSyncPos - } - } - - /** - * flush all currently written data to the file system - * @deprecated Use {@link #hsync()} or {@link #hflush()} instead - */ - @Deprecated - public void syncFs() throws IOException { - if (out != null) { - out.sync(); // flush contents to file system - } - } - - @Override - public void hsync() throws IOException { - if (out != null) { - out.hsync(); - } - } - // Pivotal changes begin - public void hsyncWithSizeUpdate() throws IOException { - if (out != null) { - if (out instanceof HdfsDataOutputStream) { - try { - ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } catch (NoSuchMethodError e){ - // We are probably working with an older version of hadoop jars which does not have the - // hsync function with SyncFlag. Use the hsync version that does not update the size. - out.hsync(); - } - } - else { - out.hsync(); - } - } - } - // Pivotal changes end - @Override - public void hflush() throws IOException { - if (out != null) { - out.hflush(); - } - } - - /** Returns the configuration of this file. */ - Configuration getConf() { return conf; } - - /** Close the file. */ - @Override - public synchronized void close() throws IOException { - keySerializer.close(); - uncompressedValSerializer.close(); - if (compressedValSerializer != null) { - compressedValSerializer.close(); - } - - CodecPool.returnCompressor(compressor); - compressor = null; - - if (out != null) { - - // Close the underlying stream iff we own it... - if (ownOutputStream) { - out.close(); - } else { - out.flush(); - } - out = null; - } - } - - synchronized void checkAndWriteSync() throws IOException { - if (sync != null && - out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync - sync(); - } - } - - /** Append a key/value pair. */ - public void append(Writable key, Writable val) - throws IOException { - append((Object) key, (Object) val); - } - - /** Append a key/value pair. */ - @SuppressWarnings("unchecked") - public synchronized void append(Object key, Object val) - throws IOException { - if (key.getClass() != keyClass) - throw new IOException("wrong key class: "+key.getClass().getName() - +" is not "+keyClass); - if (val.getClass() != valClass) - throw new IOException("wrong value class: "+val.getClass().getName() - +" is not "+valClass); - - buffer.reset(); - - // Append the 'key' - keySerializer.serialize(key); - int keyLength = buffer.getLength(); - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + key); - - // Append the 'value' - if (compress == CompressionType.RECORD) { - deflateFilter.resetState(); - compressedValSerializer.serialize(val); - deflateOut.flush(); - deflateFilter.finish(); - } else { - uncompressedValSerializer.serialize(val); - } - - // Write the record out - checkAndWriteSync(); // sync - out.writeInt(buffer.getLength()); // total record length - out.writeInt(keyLength); // key portion length - out.write(buffer.getData(), 0, buffer.getLength()); // data - } - - public synchronized void appendRaw(byte[] keyData, int keyOffset, - int keyLength, ValueBytes val) throws IOException { - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + keyLength); - - int valLength = val.getSize(); - - checkAndWriteSync(); - - out.writeInt(keyLength+valLength); // total record length - out.writeInt(keyLength); // key portion length - out.write(keyData, keyOffset, keyLength); // key - val.writeUncompressedBytes(out); // value - } - - /** Returns the current length of the output file. - * - * <p>This always returns a synchronized position. In other words, - * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position - * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However - * the key may be earlier in the file than key last written when this - * method was called (e.g., with block-compression, it may be the first key - * in the block that was being written when this method was called). - */ - public synchronized long getLength() throws IOException { - return out.getPos(); - } - - } // class Writer - - /** Write key/compressed-value pairs to a sequence-format file. */ - static class RecordCompressWriter extends Writer { - - RecordCompressWriter(Configuration conf, - Option... options) throws IOException { - super(conf, options); - } - - /** Append a key/value pair. */ - @Override - @SuppressWarnings("unchecked") - public synchronized void append(Object key, Object val) - throws IOException { - if (key.getClass() != keyClass) - throw new IOException("wrong key class: "+key.getClass().getName() - +" is not "+keyClass); - if (val.getClass() != valClass) - throw new IOException("wrong value class: "+val.getClass().getName() - +" is not "+valClass); - - buffer.reset(); - - // Append the 'key' - keySerializer.serialize(key); - int keyLength = buffer.getLength(); - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + key); - - // Compress 'value' and append it - deflateFilter.resetState(); - compressedValSerializer.serialize(val); - deflateOut.flush(); - deflateFilter.finish(); - - // Write the record out - checkAndWriteSync(); // sync - out.writeInt(buffer.getLength()); // total record length - out.writeInt(keyLength); // key portion length - out.write(buffer.getData(), 0, buffer.getLength()); // data - } - - /** Append a key/value pair. */ - @Override - public synchronized void appendRaw(byte[] keyData, int keyOffset, - int keyLength, ValueBytes val) throws IOException { - - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + keyLength); - - int valLength = val.getSize(); - - checkAndWriteSync(); // sync - out.writeInt(keyLength+valLength); // total record length - out.writeInt(keyLength); // key portion length - out.write(keyData, keyOffset, keyLength); // 'key' data - val.writeCompressedBytes(out); // 'value' data - } - - } // RecordCompressionWriter - - /** Write compressed key/value blocks to a sequence-format file. */ - static class BlockCompressWriter extends Writer { - - private int noBufferedRecords = 0; - - private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); - private DataOutputBuffer keyBuffer = new DataOutputBuffer(); - - private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); - private DataOutputBuffer valBuffer = new DataOutputBuffer(); - - private final int compressionBlockSize; - - BlockCompressWriter(Configuration conf, - Option... options) throws IOException { - super(conf, options); - compressionBlockSize = - conf.getInt("io.seqfile.compress.blocksize", 1000000); - keySerializer.close(); - keySerializer.open(keyBuffer); - uncompressedValSerializer.close(); - uncompressedValSerializer.open(valBuffer); - } - - /** Workhorse to check and write out compressed data/lengths */ - private synchronized - void writeBuffer(DataOutputBuffer uncompressedDataBuffer) - throws IOException { - deflateFilter.resetState(); - buffer.reset(); - deflateOut.write(uncompressedDataBuffer.getData(), 0, - uncompressedDataBuffer.getLength()); - deflateOut.flush(); - deflateFilter.finish(); - - WritableUtils.writeVInt(out, buffer.getLength()); - out.write(buffer.getData(), 0, buffer.getLength()); - } - - /** Compress and flush contents to dfs */ - @Override - public synchronized void sync() throws IOException { - if (noBufferedRecords > 0) { - super.sync(); - - // No. of records - WritableUtils.writeVInt(out, noBufferedRecords); - - // Write 'keys' and lengths - writeBuffer(keyLenBuffer); - writeBuffer(keyBuffer); - - // Write 'values' and lengths - writeBuffer(valLenBuffer); - writeBuffer(valBuffer); - - // Flush the file-stream - out.flush(); - - // Reset internal states - keyLenBuffer.reset(); - keyBuffer.reset(); - valLenBuffer.reset(); - valBuffer.reset(); - noBufferedRecords = 0; - } - - } - - /** Close the file. */ - @Override - public synchronized void close() throws IOException { - if (out != null) { - sync(); - } - super.close(); - } - - /** Append a key/value pair. */ - @Override - @SuppressWarnings("unchecked") - public synchronized void append(Object key, Object val) - throws IOException { - if (key.getClass() != keyClass) - throw new IOException("wrong key class: "+key+" is not "+keyClass); - if (val.getClass() != valClass) - throw new IOException("wrong value class: "+val+" is not "+valClass); - - // Save key/value into respective buffers - int oldKeyLength = keyBuffer.getLength(); - keySerializer.serialize(key); - int keyLength = keyBuffer.getLength() - oldKeyLength; - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + key); - WritableUtils.writeVInt(keyLenBuffer, keyLength); - - int oldValLength = valBuffer.getLength(); - uncompressedValSerializer.serialize(val); - int valLength = valBuffer.getLength() - oldValLength; - WritableUtils.writeVInt(valLenBuffer, valLength); - - // Added another key/value pair - ++noBufferedRecords; - - // Compress and flush? - int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); - if (currentBlockSize >= compressionBlockSize) { - sync(); - } - } - - /** Append a key/value pair. */ - @Override - public synchronized void appendRaw(byte[] keyData, int keyOffset, - int keyLength, ValueBytes val) throws IOException { - - if (keyLength < 0) - throw new IOException("negative length keys not allowed"); - - int valLength = val.getSize(); - - // Save key/value data in relevant buffers - WritableUtils.writeVInt(keyLenBuffer, keyLength); - keyBuffer.write(keyData, keyOffset, keyLength); - WritableUtils.writeVInt(valLenBuffer, valLength); - val.writeUncompressedBytes(valBuffer); - - // Added another key/value pair - ++noBufferedRecords; - - // Compress and flush? - int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); - if (currentBlockSize >= compressionBlockSize) { - sync(); - } - } - - } // BlockCompressionWriter - - /** Get the configured buffer size */ - private static int getBufferSize(Configuration conf) { - return conf.getInt("io.file.buffer.size", 4096); - } - - /** Reads key/value pairs from a sequence-format file. */ - public static class Reader implements java.io.Closeable { - private String filename; - private FSDataInputStream in; - private DataOutputBuffer outBuf = new DataOutputBuffer(); - - private byte version; - - private String keyClassName; - private String valClassName; - private Class keyClass; - private Class valClass; - - private CompressionCodec codec = null; - private Metadata metadata = null; - - private byte[] sync = new byte[SYNC_HASH_SIZE]; - private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; - private boolean syncSeen; - - private long headerEnd; - private long end; - private int keyLength; - private int recordLength; - - private boolean decompress; - private boolean blockCompressed; - - private Configuration conf; - - private int noBufferedRecords = 0; - private boolean lazyDecompress = true; - private boolean valuesDecompressed = true; - - private int noBufferedKeys = 0; - private int noBufferedValues = 0; - - private DataInputBuffer keyLenBuffer = null; - private CompressionInputStream keyLenInFilter = null; - private DataInputStream keyLenIn = null; - private Decompressor keyLenDecompressor = null; - private DataInputBuffer keyBuffer = null; - private CompressionInputStream keyInFilter = null; - private DataInputStream keyIn = null; - private Decompressor keyDecompressor = null; - - private DataInputBuffer valLenBuffer = null; - private CompressionInputStream valLenInFilter = null; - private DataInputStream valLenIn = null; - private Decompressor valLenDecompressor = null; - private DataInputBuffer valBuffer = null; - private CompressionInputStream valInFilter = null; - private DataInputStream valIn = null; - private Decompressor valDecompressor = null; - - private Deserializer keyDeserializer; - private Deserializer valDeserializer; - - /** - * A tag interface for all of the Reader options - */ - public static interface Option {} - - /** - * Create an option to specify the path name of the sequence file. - * @param value the path to read - * @return a new option - */ - public static Option file(Path value) { - return new FileOption(value); - } - - /** - * Create an option to specify the stream with the sequence file. - * @param value the stream to read. - * @return a new option - */ - public static Option stream(FSDataInputStream value) { - return new InputStreamOption(value); - } - - /** - * Create an option to specify the starting byte to read. - * @param value the number of bytes to skip over - * @return a new option - */ - public static Option start(long value) { - return new StartOption(value); - } - - /** - * Create an option to specify the number of bytes to read. - * @param value the number of bytes to read - * @return a new option - */ - public static Option length(long value) { - return new LengthOption(value); - } - - /** - * Create an option with the buffer size for reading the given pathname. - * @param value the number of bytes to buffer - * @return a new option - */ - public static Option bufferSize(int value) { - return new BufferSizeOption(value); - } - - private static class FileOption extends Options.PathOption - implements Option { - private FileOption(Path value) { - super(value); - } - } - - private static class InputStreamOption - extends Options.FSDataInputStreamOption - implements Option { - private InputStreamOption(FSDataInputStream value) { - super(value); - } - } - - private static class StartOption extends Options.LongOption - implements Option { - private StartOption(long value) { - super(value); - } - } - - private static class LengthOption extends Options.LongOption - implements Option { - private LengthOption(long value) { - super(value); - } - } - - private static class BufferSizeOption extends Options.IntegerOption - implements Option { - private BufferSizeOption(int value) { - super(value); - } - } - - // only used directly - private static class OnlyHeaderOption extends Options.BooleanOption - implements Option { - private OnlyHeaderOption() { - super(true); - } - } - - public Reader(Configuration conf, Option... opts) throws IOException { - // Look up the options, these are null if not set - FileOption fileOpt = Options.getOption(FileOption.class, opts); - InputStreamOption streamOpt = - Options.getOption(InputStreamOption.class, opts); - StartOption startOpt = Options.getOption(StartOption.class, opts); - LengthOption lenOpt = Options.getOption(LengthOption.class, opts); - BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); - OnlyHeaderOption headerOnly = - Options.getOption(OnlyHeaderOption.class, opts); - // check for consistency - if ((fileOpt == null) == (streamOpt == null)) { - throw new - IllegalArgumentException("File or stream option must be specified"); - } - if (fileOpt == null && bufOpt != null) { - throw new IllegalArgumentException("buffer size can only be set when" + - " a file is specified."); - } - // figure out the real values - Path filename = null; - FSDataInputStream file; - final long len; - if (fileOpt != null) { - filename = fileOpt.getValue(); - FileSystem fs = filename.getFileSystem(conf); - int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); - len = null == lenOpt - ? fs.getFileStatus(filename).getLen() - : lenOpt.getValue(); - file = openFile(fs, filename, bufSize, len); - } else { - len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); - file = streamOpt.getValue(); - } - long start = startOpt == null ? 0 : startOpt.getValue(); - // really set up - initialize(filename, file, start, len, conf, headerOnly != null); - } - - /** - * Construct a reader by opening a file from the given file system. - * @param fs The file system used to open the file. - * @param file The file being read. - * @param conf Configuration - * @throws IOException - * @deprecated Use Reader(Configuration, Option...) instead. - */ - @Deprecated - public Reader(FileSystem fs, Path file, - Configuration conf) throws IOException { - this(conf, file(file.makeQualified(fs))); - } - - /** - * Construct a reader by the given input stream. - * @param in An input stream. - * @param buffersize unused - * @param start The starting position. - * @param length The length being read. - * @param conf Configuration - * @throws IOException - * @deprecated Use Reader(Configuration, Reader.Option...) instead. - */ - @Deprecated - public Reader(FSDataInputStream in, int buffersize, - long start, long length, Configuration conf) throws IOException { - this(conf, stream(in), start(start), length(length)); - } - - /** Common work of the constructors. */ - private void initialize(Path filename, FSDataInputStream in, - long start, long length, Configuration conf, - boolean tempReader) throws IOException { - if (in == null) { - throw new IllegalArgumentException("in == null"); - } - this.filename = filename == null ? "<unknown>" : filename.toString(); - this.in = in; - this.conf = conf; - boolean succeeded = false; - try { - seek(start); - this.end = this.in.getPos() + length; - // if it wrapped around, use the max - if (end < length) { - end = Long.MAX_VALUE; - } - init(tempReader); - succeeded = true; - } finally { - if (!succeeded) { - IOUtils.cleanup(LOG, this.in); - } - } - } - - /** - * Override this method to specialize the type of - * {@link FSDataInputStream} returned. - * @param fs The file system used to open the file. - * @param file The file being read. - * @param bufferSize The buffer size used to read the file. - * @param length The length being read if it is >= 0. Otherwise, - * the length is not available. - * @return The opened stream. - * @throws IOException - */ - protected FSDataInputStream openFile(FileSystem fs, Path file, - int bufferSize, long length) throws IOException { - return fs.open(file, bufferSize); - } - - /** - * Initialize the {@link Reader} - * @param tempReader <code>true</code> if we are constructing a temporary - * reader {@link SequenceFile.Sorter#cloneFileAttributes}, - * and hence do not initialize every component; - * <code>false</code> otherwise. - * @throws IOException - */ - private void init(boolean tempReader) throws IOException { - byte[] versionBlock = new byte[VERSION.length]; - in.readFully(versionBlock); - - if ((versionBlock[0] != VERSION[0]) || - (versionBlock[1] != VERSION[1]) || - (versionBlock[2] != VERSION[2])) - throw new IOException(this + " not a SequenceFile"); - - // Set 'version' - version = versionBlock[3]; - if (version > VERSION[3]) - throw new VersionMismatchException(VERSION[3], version); - - if (version < BLOCK_COMPRESS_VERSION) { - UTF8 className = new UTF8(); - - className.readFields(in); - keyClassName = className.toString(); // key class name - - className.readFields(in); - valClassName = className.toString(); // val class name - } else { - keyClassName = Text.readString(in); - valClassName = Text.readString(in); - } - - if (version > 2) { // if version > 2 - this.decompress = in.readBoolean(); // is compressed? - } else { - decompress = false; - } - - if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 - this.blockCompressed = in.readBoolean(); // is block-compressed? - } else { - blockCompressed = false; - } - - // if version >= 5 - // setup the compression codec - if (decompress) { - if (version >= CUSTOM_COMPRESS_VERSION) { - String codecClassname = Text.readString(in); - try { - Class<? extends CompressionCodec> codecClass - = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); - this.codec = ReflectionUtils.newInstance(codecClass, conf); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("Unknown codec: " + - codecClassname, cnfe); - } - } else { - codec = new DefaultCodec(); - ((Configurable)codec).setConf(conf); - } - } - - this.metadata = new Metadata(); - if (version >= VERSION_WITH_METADATA) { // if version >= 6 - this.metadata.readFields(in); - } - - if (version > 1) { // if version > 1 - in.readFully(sync); // read sync bytes - headerEnd = in.getPos(); // record end of header - } - - // Initialize... *not* if this we are constructing a temporary Reader - if (!tempReader) { - valBuffer = new DataInputBuffer(); - if (decompress) { - valDecompressor = CodecPool.getDecompressor(codec); - valInFilter = codec.createInputStream(valBuffer, valDecompressor); - valIn = new DataInputStream(valInFilter); - } else { - valIn = valBuffer; - } - - if (blockCompressed) { - keyLenBuffer = new DataInputBuffer(); - keyBuffer = new DataInputBuffer(); - valLenBuffer = new DataInputBuffer(); - - keyLenDecompressor = CodecPool.getDecompressor(codec); - keyLenInFilter = codec.createInputStream(keyLenBuffer, - keyLenDecompressor); - keyLenIn = new DataInputStream(keyLenInFilter); - - keyDecompressor = CodecPool.getDecompressor(codec); - keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); - keyIn = new DataInputStream(keyInFilter); - - valLenDecompressor = CodecPool.getDecompressor(codec); - valLenInFilter = codec.createInputStream(valLenBuffer, - valLenDecompressor); - valLenIn = new DataInputStream(valLenInFilter); - } - - SerializationFactory serializationFactory = - new SerializationFactory(conf); - this.keyDeserializer = - getDeserializer(serializationFactory, getKeyClass()); - if (this.keyDeserializer == null) { - throw new IOException( - "Could not find a deserializer for the Key class: '" - + getKeyClass().getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using " - + "custom serialization."); - } - if (!blockCompressed) { - this.keyDeserializer.open(valBuffer); - } else { - this.keyDeserializer.open(keyIn); - } - this.valDeserializer = - getDeserializer(serializationFactory, getValueClass()); - if (this.valDeserializer == null) { - throw new IOException( - "Could not find a deserializer for the Value class: '" - + getValueClass().getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using " - + "custom serialization."); - } - this.valDeserializer.open(valIn); - } - } - - @SuppressWarnings("unchecked") - private Deserializer getDeserializer(SerializationFactory sf, Class c) { - return sf.getDeserializer(c); - } - - /** Close the file. */ - @Override - public synchronized void close() throws IOException { - // Return the decompressors to the pool - CodecPool.returnDecompressor(keyLenDecompressor); - CodecPool.returnDecompressor(keyDecompressor); - CodecPool.returnDecompressor(valLenDecompressor); - CodecPool.returnDecompressor(valDecompressor); - keyLenDecompressor = keyDecompressor = null; - valLenDecompressor = valDecompressor = null; - - if (keyDeserializer != null) { - keyDeserializer.close(); - } - if (valDeserializer != null) { - valDeserializer.close(); - } - - // Close the input-stream - in.close(); - } - - /** Returns the name of the key class. */ - public String getKeyClassName() { - return keyClassName; - } - - /** Returns the class of keys in this file. */ - public synchronized Class<?> getKeyClass() { - if (null == keyClass) { - try { - keyClass = WritableName.getClass(getKeyClassName(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return keyClass; - } - - /** Returns the name of the value class. */ - public String getValueClassName() { - return valClassName; - } - - /** Returns the class of values in this file. */ - public synchronized Class<?> getValueClass() { - if (null == valClass) { - try { - valClass = WritableName.getClass(getValueClassName(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return valClass; - } - - /** Returns true if values are compressed. */ - public boolean isCompressed() { return decompress; } - - /** Returns true if records are block-compressed. */ - public boolean isBlockCompressed() { return blockCompressed; } - - /** Returns the compression codec of data in this file. */ - public CompressionCodec getCompressionCodec() { return codec; } - - /** - * Get the compression type for this file. - * @return the compression type - */ - public CompressionType getCompressionType() { - if (decompress) { - return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; - } else { - return CompressionType.NONE; - } - } - - /** Returns the metadata object of the file */ - public Metadata getMetadata() { - return this.metadata; - } - - /** Returns the configuration used for this file. */ - Configuration getConf() { return conf; } - - /** Read a compressed buffer */ - private synchronized void readBuffer(DataInputBuffer buffer, - CompressionInputStream filter) throws IOException { - // Read data into a temporary buffer - DataOutputBuffer dataBuffer = new DataOutputBuffer(); - - try { - int dataBufferLength = WritableUtils.readVInt(in); - dataBuffer.write(in, dataBufferLength); - - // Set up 'buffer' connected to the input-stream - buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); - } finally { - dataBuffer.close(); - } - - // Reset the codec - filter.resetState(); - } - - /** Read the next 'compressed' block */ - private synchronized void readBlock() throws IOException { - // Check if we need to throw away a whole block of - // 'values' due to 'lazy decompression' - if (lazyDecompress && !valuesDecompressed) { - in.seek(WritableUtils.readVInt(in)+in.getPos()); - in.seek(WritableUtils.readVInt(in)+in.getPos()); - } - - // Reset internal states - noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; - valuesDecompressed = false; - - //Process sync - if (sync != null) { - in.readInt(); - in.readFully(syncCheck); // read syncCheck - if (!Arrays.equals(sync, syncCheck)) // check it - throw new IOException("File is corrupt!"); - } - syncSeen = true; - - // Read number of records in this block - noBufferedRecords = WritableUtils.readVInt(in); - - // Read key lengths and keys - readBuffer(keyLenBuffer, keyLenInFilter); - readBuffer(keyBuffer, keyInFilter); - noBufferedKeys = noBufferedRecords; - - // Read value lengths and values - if (!lazyDecompress) { - readBuffer(valLenBuffer, valLenInFilter); - readBuffer(valBuffer, valInFilter); - noBufferedValues = noBufferedRecords; - valuesDecompressed = true; - } - } - - /** - * Position valLenIn/valIn to the 'value' - * corresponding to the 'current' key - */ - private synchronized void seekToCurrentValue() throws IOException { - if (!blockCompressed) { - if (decompress) { - valInFilter.resetState(); - } - valBuffer.reset(); - } else { - // Check if this is the first value in the 'block' to be read - if (lazyDecompress && !valuesDecompressed) { - // Read the value lengths and values - readBuffer(valLenBuffer, valLenInFilter); - readBuffer(valBuffer, valInFilter); - noBufferedValues = noBufferedRecords; - valuesDecompressed = true; - } - - // Calculate the no. of bytes to skip - // Note: 'current' key has already been read! - int skipValBytes = 0; - int currentKey = noBufferedKeys + 1; - for (int i=noBufferedValues; i > currentKey; --i) { - skipValBytes += WritableUtils.readVInt(valLenIn); - --noBufferedValues; - } - - // Skip to the 'val' corresponding to 'current' key - if (skipValBytes > 0) { - if (valIn.skipBytes(skipValBytes) != skipValBytes) { - throw new IOException("Failed to seek to " + currentKey + - "(th) value!"); - } - } - } - } - - /** - * Get the 'value' corresponding to the last read 'key'. - * @param val : The 'value' to be read. - * @throws IOException - */ - public synchronized void getCurrentValue(Writable val) - throws IOException { - if (val instanceof Configurable) { - ((Configurable) val).setConf(this.conf); - } - - // Position stream to 'current' value - seekToCurrentValue(); - - if (!blockCompressed) { - val.readFields(valIn); - - if (valIn.read() > 0) { - LOG.info("available bytes: " + valIn.available()); - throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) - + " bytes, should read " + - (valBuffer.getLength()-keyLength)); - } - } else { - // Get the value - int valLength = WritableUtils.readVInt(valLenIn); - val.readFields(valIn); - - // Read another compressed 'value' - --noBufferedValues; - - // Sanity check - if ((valLength < 0) && LOG.isDebugEnabled()) { - LOG.debug(val + " is a zero-length value"); - } - } - - } - - /** - * Get the 'value' corresponding to the last read 'key'. - * @param val : The 'value' to be read. - * @throws IOException - */ - public synchronized Object getCurrentValue(Object val) - throws IOException { - if (val instanceof Configurable) { - ((Configurable) val).setConf(this.conf); - } - - // Position stream to 'current' value - seekToCurrentValue(); - - if (!blockCompressed) { - val = deserializeValue(val); - - if (valIn.read() > 0) { - LOG.info("available bytes: " + valIn.available()); - throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) - + " bytes, should read " + - (valBuffer.getLength()-keyLength)); - } - } else { - // Get the value - int valLength = WritableUtils.readVInt(valLenIn); - val = deserializeValue(val); - - // Read another compressed 'value' - --noBufferedValues; - - // Sanity check - if ((valLength < 0) && LOG.isDebugEnabled()) { - LOG.debug(val + " is a zero-length value"); - } - } - return val; - - } - - @SuppressWarnings("unchecked") - private Object deserializeValue(Object val) throws IOException { - return valDeserializer.deserialize(val); - } - - /** Read the next key in the file into <code>key</code>, skipping its - * value. True if another entry exists, and false at end of file. */ - public synchronized boolean next(Writable key) throws IOException { - if (key.getClass() != getKeyClass()) - throw new IOException("wrong key class: "+key.getClass().getName() - +" is not "+keyClass); - - if (!blockCompressed) { - outBuf.reset(); - - keyLength = next(outBuf); - if (keyLength < 0) - return false; - - valBuffer.reset(outBuf.getData(), outBuf.getLength()); - - key.readFields(valBuffer); - valBuffer.mark(0); - if (valBuffer.getPosition() != keyLength) - throw new IOException(key + " read " + valBuffer.getPosition() - + " bytes, should read " + keyLength); - } else { - //Reset syncSeen - syncSeen = false; - - if (noBufferedKeys == 0) { - try { - readBlock(); - } catch (EOFException eof) { - return false; - } - } - - int keyLength = WritableUtils.readVInt(keyLenIn); - - // Sanity check - if (keyLength < 0) { - return false; - } - - //Read another compressed 'key' - key.readFields(keyIn); - --noBufferedKeys; - } - - return true; - } - - /** Read the next key/value pair in the file into <code>key</code> and - * <code>val</code>. Returns true if such a pair exists and false when at - * end of file */ - public synchronized boolean next(Writable key, Writable val) - throws IOException { - if (val.getClass() != getValueClass()) - throw new IOException("wrong value class: "+val+" is not "+valClass); - - boolean more = next(key); - - if (more) { - getCurrentValue(val); - } - - return more; - } - - /** - * Read and return the next record length, potentially skipping over - * a sync block. - * @return the length of the next record or -1 if there is no next record - * @throws IOException - */ - private synchronized int readRecordLength() throws IOException { - if (in.getPos() >= end) { - return -1; - } - int length = in.readInt(); - if (version > 1 && sync != null && - length == SYNC_ESCAPE) { // process a sync entry - in.readFully(syncCheck); // read syncCheck - if (!Arrays.equals(sync, syncCheck)) // check it - throw new IOException("File is corrupt!"); - syncSeen = true; - if (in.getPos() >= end) { - return -1; - } - length = in.readInt(); // re-read length - } else { - syncSeen = false; - } - - return length; - } - - /** Read the next key/value pair in the file into <code>buffer</code>. - * Returns the length of the key read, or -1 if at end of file. The length - * of the value may be computed by calling buffer.getLength() before and - * after calls to this method. */ - /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ - @Deprecated - synchronized int next(DataOutputBuffer buffer) throws IOException { - // Unsupported for block-compressed sequence files - if (blockCompressed) { - throw new IOException("Unsupported call for block-compressed" + - " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); - } - try { - int length = readRecordLength(); - if (length == -1) { - return -1; - } - int keyLength = in.readInt(); - buffer.write(in, length); - return keyLength; - } catch (ChecksumException e) { // checksum failure - handleChecksumException(e); - return next(buffer); - } - } - - public ValueBytes createValueBytes() { - ValueBytes val = null; - if (!decompress || blockCompressed) { - val = new UncompressedBytes(); - } else { - val = new CompressedBytes(codec); - } - return val; - } - - /** - * Read 'raw' records. - * @param key - The buffer into which the key is read - * @param val - The 'raw' value - * @return Returns the total record length or -1 for end of file - * @throws IOException - */ - public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) - throws IOException { - if (!blockCompressed) { - int length = readRecordLength(); - if (length == -1) { - return -1; - } - int keyLength = in.readInt(); - int valLength = le
<TRUNCATED>