Repository: parquet-mr Updated Branches: refs/heads/master 81f480149 -> 8bfd9b4d8
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 9512b93..bdde70e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -28,6 +28,8 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; /** @@ -219,7 +221,8 @@ public class ParquetWriter<T> implements Closeable { boolean validating, WriterVersion writerVersion, Configuration conf) throws IOException { - this(file, mode, writeSupport, compressionCodecName, blockSize, + this(HadoopOutputFile.fromPath(file, conf), + mode, writeSupport, compressionCodecName, blockSize, validating, conf, MAX_PADDING_SIZE_DEFAULT, ParquetProperties.builder() .withPageSize(pageSize) @@ -257,11 +260,11 @@ public class ParquetWriter<T> implements Closeable { } ParquetWriter( - Path file, + OutputFile file, ParquetFileWriter.Mode mode, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, - int blockSize, + int rowGroupSize, boolean validating, Configuration conf, int maxPaddingSize, @@ -271,7 +274,7 @@ public class ParquetWriter<T> implements Closeable { MessageType schema = writeContext.getSchema(); ParquetFileWriter fileWriter = new ParquetFileWriter( - conf, schema, file, mode, blockSize, maxPaddingSize); + file, schema, mode, rowGroupSize, maxPaddingSize); fileWriter.start(); this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); @@ -281,7 +284,7 @@ public class ParquetWriter<T> implements Closeable { writeSupport, schema, writeContext.getExtraMetaData(), - blockSize, + rowGroupSize, compressor, validating, encodingProps); @@ -324,7 +327,8 @@ public class ParquetWriter<T> implements Closeable { * @param <SELF> The type of this builder that is returned by builder methods */ public abstract static class Builder<T, SELF extends Builder<T, SELF>> { - private final Path file; + private OutputFile file = null; + private Path path = null; private Configuration conf = new Configuration(); private ParquetFileWriter.Mode mode; private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; @@ -334,8 +338,12 @@ public class ParquetWriter<T> implements Closeable { private ParquetProperties.Builder encodingPropsBuilder = ParquetProperties.builder(); - protected Builder(Path file) { - this.file = file; + protected Builder(Path path) { + this.path = path; + } + + protected Builder(OutputFile path) { + this.file = path; } /** @@ -485,15 +493,35 @@ public class ParquetWriter<T> implements Closeable { } /** + * Set a property that will be available to the read path. For writers that use a Hadoop + * configuration, this is the recommended way to add configuration values. + * + * @param property a String property name + * @param value a String property value + * @return this builder for method chaining. + */ + public SELF config(String property, String value) { + conf.set(property, value); + return self(); + } + + /** * Build a {@link ParquetWriter} with the accumulated configuration. * * @return a configured {@code ParquetWriter} instance. * @throws IOException */ public ParquetWriter<T> build() throws IOException { - return new ParquetWriter<T>(file, mode, getWriteSupport(conf), codecName, - rowGroupSize, enableValidation, conf, maxPaddingSize, - encodingPropsBuilder.build()); + if (file != null) { + return new ParquetWriter<>(file, + mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf, + maxPaddingSize, encodingPropsBuilder.build()); + } else { + return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf), + mode, getWriteSupport(conf), codecName, + rowGroupSize, enableValidation, conf, maxPaddingSize, + encodingPropsBuilder.build()); + } } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java index 4696319..a70a0d0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java @@ -20,10 +20,12 @@ package org.apache.parquet.hadoop; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; // Essentially taken from: // https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124 @@ -60,6 +62,10 @@ public class UnmaterializableRecordCounter { ); } + public UnmaterializableRecordCounter(ParquetReadOptions options, long totalNumRecords) { + this(getFloat(options, BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD), totalNumRecords); + } + public UnmaterializableRecordCounter(double errorThreshold, long totalNumRecords) { this.errorThreshold = errorThreshold; this.totalNumRecords = totalNumRecords; @@ -85,4 +91,13 @@ public class UnmaterializableRecordCounter { throw new ParquetDecodingException(message, cause); } } + + private static float getFloat(ParquetReadOptions options, String key, float defaultValue) { + String value = options.getProperty(key); + if (value != null) { + return Float.valueOf(value); + } else { + return defaultValue; + } + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java deleted file mode 100644 index 9657cc1..0000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop.codec; - -/** - * This exception will be thrown when the codec is not supported by parquet, meaning there is no - * matching codec defined in {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} - */ -public class CompressionCodecNotSupportedException extends RuntimeException { - private final Class codecClass; - - public CompressionCodecNotSupportedException(Class codecClass) { - super("codec not supported: " + codecClass.getName()); - this.codecClass = codecClass; - } - - public Class getCodecClass() { - return codecClass; - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java deleted file mode 100644 index 153133e..0000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop.metadata; - -import org.apache.parquet.format.CompressionCodec; -import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException; - -import java.util.Locale; - -public enum CompressionCodecName { - UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""), - SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"), - GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"), - LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"), - BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"), - LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"), - ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, ".zstd"); - - public static CompressionCodecName fromConf(String name) { - if (name == null) { - return UNCOMPRESSED; - } - return valueOf(name.toUpperCase(Locale.ENGLISH)); - } - - public static CompressionCodecName fromCompressionCodec(Class<?> clazz) { - if (clazz == null) { - return UNCOMPRESSED; - } - String name = clazz.getName(); - for (CompressionCodecName codec : CompressionCodecName.values()) { - if (name.equals(codec.getHadoopCompressionCodecClassName())) { - return codec; - } - } - throw new CompressionCodecNotSupportedException(clazz); - } - - public static CompressionCodecName fromParquet(CompressionCodec codec) { - for (CompressionCodecName codecName : CompressionCodecName.values()) { - if (codec.equals(codecName.parquetCompressionCodec)) { - return codecName; - } - } - throw new IllegalArgumentException("Unknown compression codec " + codec); - } - - private final String hadoopCompressionCodecClass; - private final CompressionCodec parquetCompressionCodec; - private final String extension; - - private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) { - this.hadoopCompressionCodecClass = hadoopCompressionCodecClass; - this.parquetCompressionCodec = parquetCompressionCodec; - this.extension = extension; - } - - public String getHadoopCompressionCodecClassName() { - return hadoopCompressionCodecClass; - } - - public Class getHadoopCompressionCodecClass() { - String codecClassName = getHadoopCompressionCodecClassName(); - if (codecClassName==null) { - return null; - } - try { - return Class.forName(codecClassName); - } catch (ClassNotFoundException e) { - return null; - } - } - - public CompressionCodec getParquetCompressionCodec() { - return parquetCompressionCodec; - } - - public String getExtension() { - return extension; - } - -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index 4a03b1a..876a1f3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -20,32 +20,23 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.io.SeekableInputStream; -import java.io.EOFException; +import org.apache.parquet.io.DelegatingSeekableInputStream; import java.io.IOException; -import java.nio.ByteBuffer; /** * SeekableInputStream implementation that implements read(ByteBuffer) for * Hadoop 1 FSDataInputStream. */ -class H1SeekableInputStream extends SeekableInputStream { - - private final int COPY_BUFFER_SIZE = 8192; - private final byte[] temp = new byte[COPY_BUFFER_SIZE]; +class H1SeekableInputStream extends DelegatingSeekableInputStream { private final FSDataInputStream stream; public H1SeekableInputStream(FSDataInputStream stream) { + super(stream); this.stream = stream; } @Override - public void close() throws IOException { - stream.close(); - } - - @Override public long getPos() throws IOException { return stream.getPos(); } @@ -56,16 +47,6 @@ class H1SeekableInputStream extends SeekableInputStream { } @Override - public int read() throws IOException { - return stream.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return stream.read(b, off, len); - } - - @Override public void readFully(byte[] bytes) throws IOException { stream.readFully(bytes, 0, bytes.length); } @@ -75,80 +56,4 @@ class H1SeekableInputStream extends SeekableInputStream { stream.readFully(bytes); } - @Override - public int read(ByteBuffer buf) throws IOException { - if (buf.hasArray()) { - return readHeapBuffer(stream, buf); - } else { - return readDirectBuffer(stream, buf, temp); - } - } - - @Override - public void readFully(ByteBuffer buf) throws IOException { - if (buf.hasArray()) { - readFullyHeapBuffer(stream, buf); - } else { - readFullyDirectBuffer(stream, buf, temp); - } - } - - // Visible for testing - static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException { - int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); - if (bytesRead < 0) { - // if this resulted in EOF, don't update position - return bytesRead; - } else { - buf.position(buf.position() + bytesRead); - return bytesRead; - } - } - - // Visible for testing - static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException { - f.readFully(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); - buf.position(buf.limit()); - } - - // Visible for testing - static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException { - // copy all the bytes that return immediately, stopping at the first - // read that doesn't return a full buffer. - int nextReadLength = Math.min(buf.remaining(), temp.length); - int totalBytesRead = 0; - int bytesRead; - - while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) { - buf.put(temp); - totalBytesRead += bytesRead; - nextReadLength = Math.min(buf.remaining(), temp.length); - } - - if (bytesRead < 0) { - // return -1 if nothing was read - return totalBytesRead == 0 ? -1 : totalBytesRead; - } else { - // copy the last partial buffer - buf.put(temp, 0, bytesRead); - totalBytesRead += bytesRead; - return totalBytesRead; - } - } - - // Visible for testing - static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException { - int nextReadLength = Math.min(buf.remaining(), temp.length); - int bytesRead = 0; - - while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) { - buf.put(temp, 0, bytesRead); - nextReadLength = Math.min(buf.remaining(), temp.length); - } - - if (bytesRead < 0 && buf.remaining() > 0) { - throw new EOFException( - "Reached the end of stream. Still have: " + buf.remaining() + " bytes left"); - } - } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java index ec4567e..c68f6b6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -20,7 +20,7 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.io.DelegatingSeekableInputStream; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -29,7 +29,7 @@ import java.nio.ByteBuffer; * SeekableInputStream implementation for FSDataInputStream that implements * ByteBufferReadable in Hadoop 2. */ -class H2SeekableInputStream extends SeekableInputStream { +class H2SeekableInputStream extends DelegatingSeekableInputStream { // Visible for testing interface Reader { @@ -40,6 +40,7 @@ class H2SeekableInputStream extends SeekableInputStream { private final Reader reader; public H2SeekableInputStream(FSDataInputStream stream) { + super(stream); this.stream = stream; this.reader = new H2Reader(); } @@ -60,21 +61,6 @@ class H2SeekableInputStream extends SeekableInputStream { } @Override - public int read() throws IOException { - return stream.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return stream.read(b, off, len); - } - - @Override - public void readFully(byte[] bytes) throws IOException { - stream.readFully(bytes, 0, bytes.length); - } - - @Override public void readFully(byte[] bytes, int start, int len) throws IOException { stream.readFully(bytes); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java new file mode 100644 index 0000000..a46c8db --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.CodecFactory; + +public class HadoopCodecs { + public static CompressionCodecFactory newFactory(int sizeHint) { + return new CodecFactory(new Configuration(), sizeHint); + } + + public static CompressionCodecFactory newFactory(Configuration conf, int sizeHint) { + return new CodecFactory(conf, sizeHint); + } + + public static CompressionCodecFactory newDirectFactory(Configuration conf, ByteBufferAllocator allocator, int sizeHint) { + return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java new file mode 100644 index 0000000..4740fd4 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +public class HadoopOutputFile implements OutputFile { + // need to supply a buffer size when setting block size. this is the default + // for hadoop 1 to present. copying it avoids loading DFSConfigKeys. + private static final int DFS_BUFFER_SIZE_DEFAULT = 4096; + + private static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>(); + static { + BLOCK_FS_SCHEMES.add("hdfs"); + BLOCK_FS_SCHEMES.add("webhdfs"); + BLOCK_FS_SCHEMES.add("viewfs"); + } + + // visible for testing + public static Set<String> getBlockFileSystems() { + return BLOCK_FS_SCHEMES; + } + + private static boolean supportsBlockSize(FileSystem fs) { + return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme()); + } + + private final FileSystem fs; + private final Path path; + private final Configuration conf; + + public static HadoopOutputFile fromPath(Path path, Configuration conf) + throws IOException { + FileSystem fs = path.getFileSystem(conf); + return new HadoopOutputFile(fs, fs.makeQualified(path), conf); + } + + private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) { + this.fs = fs; + this.path = path; + this.conf = conf; + } + + public Configuration getConfiguration() { + return conf; + } + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return HadoopStreams.wrap(fs.create(path, false /* do not overwrite */, + DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path), + Math.max(fs.getDefaultBlockSize(path), blockSizeHint))); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return HadoopStreams.wrap(fs.create(path, true /* overwrite if exists */, + DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path), + Math.max(fs.getDefaultBlockSize(path), blockSizeHint))); + } + + @Override + public boolean supportsBlockSize() { + return supportsBlockSize(fs); + } + + @Override + public long defaultBlockSize() { + return fs.getDefaultBlockSize(path); + } + + @Override + public String toString() { + return path.toString(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java new file mode 100644 index 0000000..4b194aa --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.parquet.io.PositionOutputStream; +import java.io.IOException; + +public class HadoopPositionOutputStream extends PositionOutputStream { + private final FSDataOutputStream wrapped; + + HadoopPositionOutputStream(FSDataOutputStream wrapped) { + this.wrapped = wrapped; + } + + @Override + public long getPos() throws IOException { + return wrapped.getPos(); + } + + @Override + public void write(int b) throws IOException { + wrapped.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + wrapped.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + wrapped.write(b, off, len); + } + + public void sync() throws IOException { + wrapped.hsync(); + } + + @Override + public void flush() throws IOException { + wrapped.flush(); + } + + @Override + public void close() throws IOException { + wrapped.close(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index 8731bd6..c35e98f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -20,8 +20,11 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.parquet.Preconditions; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.io.PositionOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +49,7 @@ public class HadoopStreams { * @return a SeekableInputStream */ public static SeekableInputStream wrap(FSDataInputStream stream) { + Preconditions.checkNotNull(stream, "Cannot wrap a null input stream"); if (byteBufferReadableClass != null && h2SeekableConstructor != null && byteBufferReadableClass.isInstance(stream.getWrappedStream())) { try { @@ -99,4 +103,15 @@ public class HadoopStreams { return null; } + /** + * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream} + * implementation for Parquet writers. + * + * @param stream a Hadoop FSDataOutputStream + * @return a SeekableOutputStream + */ + public static PositionOutputStream wrap(FSDataOutputStream stream) { + Preconditions.checkNotNull(stream, "Cannot wrap a null output stream"); + return new HadoopPositionOutputStream(stream); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java index 0ac9c0f..8e3e6c7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java @@ -35,6 +35,8 @@ import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; @@ -105,7 +107,7 @@ public class TestInputOutputFormatWithPadding { @Test public void testBasicBehaviorWithPadding() throws Exception { - ParquetFileWriter.BLOCK_FS_SCHEMES.add("file"); + HadoopOutputFile.getBlockFileSystems().add("file"); File inputFile = temp.newFile(); FileOutputStream out = new FileOutputStream(inputFile); @@ -186,7 +188,7 @@ public class TestInputOutputFormatWithPadding { Assert.assertEquals("Should match written file content", FILE_CONTENT, reconstructed); - ParquetFileWriter.BLOCK_FS_SCHEMES.remove("file"); + HadoopOutputFile.getBlockFileSystems().remove("file"); } private void waitForJob(Job job) throws Exception { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 1442e04..6915c86 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.junit.Assume; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java new file mode 100644 index 0000000..b41b3c8 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import java.io.ByteArrayInputStream; +import java.io.IOException; + +class MockHadoopInputStream extends ByteArrayInputStream + implements Seekable, PositionedReadable { + static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private int[] lengths; + private int current = 0; + MockHadoopInputStream(int... actualReadLengths) { + super(TEST_ARRAY); + this.lengths = actualReadLengths; + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + if (current < lengths.length) { + if (len <= lengths[current]) { + // when len == lengths[current], the next read will by 0 bytes + int bytesRead = super.read(b, off, len); + lengths[current] -= bytesRead; + return bytesRead; + } else { + int bytesRead = super.read(b, off, lengths[current]); + current += 1; + return bytesRead; + } + } else { + return super.read(b, off, len); + } + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + seek(position); + return read(buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + throw new UnsupportedOperationException("Not actually supported."); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + throw new UnsupportedOperationException("Not actually supported."); + } + + @Override + public void seek(long pos) throws IOException { + this.pos = (int) pos; + } + + @Override + public long getPos() throws IOException { + return this.pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + seek(targetPos); + return true; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java deleted file mode 100644 index a112288..0000000 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.parquet.hadoop.util; - -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; -import java.io.ByteArrayInputStream; -import java.io.IOException; - -class MockInputStream extends ByteArrayInputStream - implements Seekable, PositionedReadable { - static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; - - private int[] lengths; - private int current = 0; - MockInputStream(int... actualReadLengths) { - super(TEST_ARRAY); - this.lengths = actualReadLengths; - } - - @Override - public synchronized int read(byte[] b, int off, int len) { - if (current < lengths.length) { - if (len <= lengths[current]) { - // when len == lengths[current], the next read will by 0 bytes - int bytesRead = super.read(b, off, len); - lengths[current] -= bytesRead; - return bytesRead; - } else { - int bytesRead = super.read(b, off, lengths[current]); - current += 1; - return bytesRead; - } - } else { - return super.read(b, off, len); - } - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) throws IOException { - seek(position); - return read(buffer, offset, length); - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - throw new UnsupportedOperationException("Not actually supported."); - } - - @Override - public void readFully(long position, byte[] buffer) throws IOException { - throw new UnsupportedOperationException("Not actually supported."); - } - - @Override - public void seek(long pos) throws IOException { - this.pos = (int) pos; - } - - @Override - public long getPos() throws IOException { - return this.pos; - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - seek(targetPos); - return true; - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java deleted file mode 100644 index 9e4e2a9..0000000 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java +++ /dev/null @@ -1,761 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.parquet.hadoop.util; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.hadoop.TestUtils; -import org.junit.Assert; -import org.junit.Test; -import java.io.EOFException; -import java.nio.ByteBuffer; -import java.util.concurrent.Callable; - -import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY; - -public class TestHadoop1ByteBufferReads { - - private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() { - @Override - protected byte[] initialValue() { - return new byte[8192]; - } - }; - - @Test - public void testHeapRead() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(20); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, len); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(-1, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapSmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(5); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(5, len); - Assert.assertEquals(5, readBuffer.position()); - Assert.assertEquals(5, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(0, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer); - } - - @Test - public void testHeapSmallReads() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(2, len); - Assert.assertEquals(2, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(3, len); - Assert.assertEquals(5, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(3, len); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(2, len); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapPosition() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(20); - readBuffer.position(10); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8)); - - int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(8, len); - Assert.assertEquals(18, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(2, len); - Assert.assertEquals(20, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(-1, len); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(20); - readBuffer.limit(8); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); - - int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(7, len); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(1, len); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(0, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testHeapPositionAndLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(20); - readBuffer.position(5); - readBuffer.limit(13); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); - - int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(7, len); - Assert.assertEquals(12, readBuffer.position()); - Assert.assertEquals(13, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(1, len); - Assert.assertEquals(13, readBuffer.position()); - Assert.assertEquals(13, readBuffer.limit()); - - len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(0, len); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testDirectRead() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, len); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(-1, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectSmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(5); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(5, len); - Assert.assertEquals(5, readBuffer.position()); - Assert.assertEquals(5, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(0, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer); - } - - @Test - public void testDirectSmallReads() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(2, len); - Assert.assertEquals(2, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(3, len); - Assert.assertEquals(5, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(3, len); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(2, len); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectPosition() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - readBuffer.position(10); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8)); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(8, len); - Assert.assertEquals(18, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(2, len); - Assert.assertEquals(20, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(-1, len); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(20); - readBuffer.limit(8); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(7, len); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(1, len); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(0, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testDirectPositionAndLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - readBuffer.position(5); - readBuffer.limit(13); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(7, len); - Assert.assertEquals(12, readBuffer.position()); - Assert.assertEquals(13, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(1, len); - Assert.assertEquals(13, readBuffer.position()); - Assert.assertEquals(13, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(0, len); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testDirectSmallTempBufferSmallReads() throws Exception { - byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop - - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(2, len); - Assert.assertEquals(2, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(3, len); - Assert.assertEquals(5, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(3, len); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(2, len); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(-1, len); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception { - byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop - - ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - readBuffer.position(5); - readBuffer.limit(13); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); - - int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(7, len); - Assert.assertEquals(12, readBuffer.position()); - Assert.assertEquals(13, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(1, len); - Assert.assertEquals(13, readBuffer.position()); - Assert.assertEquals(13, readBuffer.limit()); - - len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(0, len); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testHeapReadFullySmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(8); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testHeapReadFullyLargeBuffer() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(20); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - TestUtils.assertThrows("Should throw EOFException", - EOFException.class, new Callable() { - @Override - public Object call() throws Exception { - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - return null; - } - }); - - Assert.assertEquals(0, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - } - - @Test - public void testHeapReadFullyJustRight() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(10); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - // reads all of the bytes available without EOFException - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - // trying to read 0 more bytes doesn't result in EOFException - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapReadFullySmallReads() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(10); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapReadFullyPosition() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(10); - readBuffer.position(3); - readBuffer.mark(); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - } - - @Test - public void testHeapReadFullyLimit() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(10); - readBuffer.limit(7); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapReadFullyPositionAndLimit() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(10); - readBuffer.position(3); - readBuffer.limit(7); - readBuffer.mark(); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - } - - @Test - public void testDirectReadFullySmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testDirectReadFullyLargeBuffer() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - TestUtils.assertThrows("Should throw EOFException", - EOFException.class, new Callable() { - @Override - public Object call() throws Exception { - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - return null; - } - }); - - // NOTE: This behavior differs from readFullyHeapBuffer because direct uses - // several read operations that will read up to the end of the input. This - // is a correct value because the bytes in the buffer are valid. This - // behavior can't be implemented for the heap buffer without using the read - // method instead of the readFully method on the underlying - // FSDataInputStream. - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - } - - @Test - public void testDirectReadFullyJustRight() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); - - // reads all of the bytes available without EOFException - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - // trying to read 0 more bytes doesn't result in EOFException - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectReadFullySmallReads() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectReadFullyPosition() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.mark(); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - } - - @Test - public void testDirectReadFullyLimit() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.limit(7); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testDirectReadFullyPositionAndLimit() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.limit(7); - readBuffer.mark(); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - } - - @Test - public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception { - byte[] temp = new byte[2]; // this will cause readFully to loop - - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.limit(7); - readBuffer.mark(); - - final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", - ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java index 86b903c..68c9b3b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java @@ -28,7 +28,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Callable; -import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY; +import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; public class TestHadoop2ByteBufferReads { @@ -59,7 +59,7 @@ public class TestHadoop2ByteBufferReads { public void testHeapReadFullySmallBuffer() throws Exception { ByteBuffer readBuffer = ByteBuffer.allocate(8); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream()); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -79,7 +79,7 @@ public class TestHadoop2ByteBufferReads { public void testHeapReadFullyLargeBuffer() throws Exception { final ByteBuffer readBuffer = ByteBuffer.allocate(20); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream()); final MockBufferReader reader = new MockBufferReader(hadoopStream); TestUtils.assertThrows("Should throw EOFException", @@ -105,7 +105,7 @@ public class TestHadoop2ByteBufferReads { public void testHeapReadFullyJustRight() throws Exception { ByteBuffer readBuffer = ByteBuffer.allocate(10); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream()); MockBufferReader reader = new MockBufferReader(hadoopStream); // reads all of the bytes available without EOFException @@ -127,7 +127,7 @@ public class TestHadoop2ByteBufferReads { public void testHeapReadFullySmallReads() throws Exception { ByteBuffer readBuffer = ByteBuffer.allocate(10); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -149,7 +149,7 @@ public class TestHadoop2ByteBufferReads { readBuffer.position(3); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -170,7 +170,7 @@ public class TestHadoop2ByteBufferReads { ByteBuffer readBuffer = ByteBuffer.allocate(10); readBuffer.limit(7); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -203,7 +203,7 @@ public class TestHadoop2ByteBufferReads { readBuffer.limit(7); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -233,7 +233,7 @@ public class TestHadoop2ByteBufferReads { public void testDirectReadFullySmallBuffer() throws Exception { ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream()); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -253,7 +253,7 @@ public class TestHadoop2ByteBufferReads { public void testDirectReadFullyLargeBuffer() throws Exception { final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream()); final MockBufferReader reader = new MockBufferReader(hadoopStream); TestUtils.assertThrows("Should throw EOFException", @@ -279,7 +279,7 @@ public class TestHadoop2ByteBufferReads { public void testDirectReadFullyJustRight() throws Exception { ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream()); MockBufferReader reader = new MockBufferReader(hadoopStream); // reads all of the bytes available without EOFException @@ -301,7 +301,7 @@ public class TestHadoop2ByteBufferReads { public void testDirectReadFullySmallReads() throws Exception { ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -323,7 +323,7 @@ public class TestHadoop2ByteBufferReads { readBuffer.position(3); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -344,7 +344,7 @@ public class TestHadoop2ByteBufferReads { ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); readBuffer.limit(7); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); @@ -377,7 +377,7 @@ public class TestHadoop2ByteBufferReads { readBuffer.limit(7); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3)); MockBufferReader reader = new MockBufferReader(hadoopStream); H2SeekableInputStream.readFully(reader, readBuffer); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java ---------------------------------------------------------------------- diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java index 5d79a49..fe64587 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HiddenFileFilter; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.FileMetaData; @@ -91,7 +92,7 @@ public class MergeCommand extends ArgsOnlyCommand { tooSmallFilesMerged = true; } - writer.appendFile(conf, input); + writer.appendFile(HadoopInputFile.fromPath(input, conf)); } if (tooSmallFilesMerged) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 44b0b62..05e3e47 100644 --- a/pom.xml +++ b/pom.xml @@ -262,7 +262,14 @@ <exclude>org/apache/parquet/avro/SpecificDataSupplier</exclude> <!-- made public --> <exclude>org/apache/parquet/io/ColumnIOFactory$ColumnIOCreatorVisitor</exclude> <!-- removed non-API class --> <exclude>org/apache/parquet/io/ColumnIOFactory/**</exclude> <!-- removed non-API class and methods--> - <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier --> + <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier --> + <exclude>org/apache/parquet/bytes/BytesInput</exclude> <!-- moved to parquet-common --> + <exclude>org/apache/parquet/bytes/CapacityByteArrayOutputStream</exclude> <!-- moved to parquet-common --> + <exclude>org/apache/parquet/bytes/ConcatenatingByteArrayCollector</exclude> <!-- moved to parquet-common --> + <exclude>org/apache/parquet/bytes/LittleEndianDataInputStream</exclude> <!-- moved to parquet-common --> + <exclude>org/apache/parquet/bytes/LittleEndianDataOutputStream</exclude> <!-- moved to parquet-common --> + <exclude>org/apache/parquet/hadoop/metadata/CompressionCodecName</exclude> <!-- moved to parquet-common --> + <exclude>org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException</exclude> <!-- moved to parquet-common --> </excludes> </requireBackwardCompatibility> </rules>