Repository: drill Updated Branches: refs/heads/DRILL-2959v2 [created] e1fb13f47
DRILL-2959: Make sure to close out compression codecs. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1fb13f4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1fb13f4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1fb13f4 Branch: refs/heads/DRILL-2959v2 Commit: e1fb13f47d3332f643e3229a5553652b4afa20ab Parents: 3b19076 Author: Jacques Nadeau <jacq...@apache.org> Authored: Mon May 4 18:14:38 2015 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed May 6 13:33:05 2015 +0100 ---------------------------------------------------------------------- .../exec/store/parquet/DirectCodecFactory.java | 379 +++++++++++++++++++ .../exec/store/parquet/DirectCodecPool.java | 187 +++++++++ .../ParquetDirectByteBufferAllocator.java | 13 +- .../exec/store/parquet/ParquetFormatPlugin.java | 7 - .../exec/store/parquet/ParquetRecordWriter.java | 11 +- .../store/parquet/ParquetScanBatchCreator.java | 3 +- .../store/parquet/columnreaders/PageReader.java | 32 +- .../columnreaders/ParquetRecordReader.java | 41 +- .../exec/store/parquet2/DrillParquetReader.java | 23 +- .../parquet/hadoop/CodecFactoryExposer.java | 160 -------- .../parquet/hadoop/ColumnChunkIncReadStore.java | 7 +- .../ColumnChunkPageWriteStoreExposer.java | 14 +- .../exec/store/TestDirectCodecFactory.java | 155 ++++++++ .../store/parquet/ParquetRecordReaderTest.java | 3 +- pom.xml | 2 +- 15 files changed, 797 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java new file mode 100644 index 0000000..ed455a2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.IdentityHashMap; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.xerial.snappy.Snappy; + +import parquet.bytes.ByteBufferAllocator; +import parquet.bytes.BytesInput; +import parquet.hadoop.CodecFactory; +import parquet.hadoop.CodecFactory.BytesCompressor; +import parquet.hadoop.HeapCodecFactory.HeapBytesCompressor; +import parquet.hadoop.HeapCodecFactory.HeapBytesDecompressor; +import parquet.hadoop.metadata.CompressionCodecName; + +import com.google.common.base.Preconditions; + +public class DirectCodecFactory extends CodecFactory<BytesCompressor, DirectBytesDecompressor> implements AutoCloseable { + + private final ByteBufferAllocator allocator; + private final IdentityHashMap<ByteBuffer, Integer> allocatedBuffers = new IdentityHashMap<ByteBuffer, Integer>(); + + public DirectCodecFactory(Configuration config, ByteBufferAllocator allocator) { + super(config); + Preconditions.checkNotNull(allocator); + this.allocator = allocator; + } + + public DirectCodecFactory(Configuration config, BufferAllocator allocator) { + this(config, new ParquetDirectByteBufferAllocator(allocator)); + } + + private ByteBuffer ensure(ByteBuffer buffer, int size) { + if (buffer == null) { + buffer = allocator.allocate(size); + allocatedBuffers.put(buffer, 0); + } else if (buffer.capacity() >= size) { + buffer.clear(); + } else { + allocator.release(buffer); + release(buffer); + buffer = allocator.allocate(size); + allocatedBuffers.put(buffer, 0); + } + return buffer; + } + + ByteBuffer release(ByteBuffer buffer) { + if (buffer != null) { + allocator.release(buffer); + allocatedBuffers.remove(buffer); + } + return null; + } + + @Override + protected BytesCompressor createCompressor(final CompressionCodecName codecName, final CompressionCodec codec, + int pageSize) { + + if (codec == null) { + return new NoopCompressor(); + } else if (codecName == CompressionCodecName.SNAPPY) { + // avoid using the Parquet Snappy codec since it allocates direct buffers at awkward spots. + return new SnappyCompressor(); + } else { + + // todo: move zlib above since it also generates allocateDirect calls. + return new HeapBytesCompressor(codecName, codec, pageSize); + } + } + + @Override + protected DirectBytesDecompressor createDecompressor(final CompressionCodec codec) { + // if (true) { + // return new HeapFakeDirect(codec); + // } + + if (codec == null) { + return new NoopDecompressor(); + } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) { + return new FullDirectDecompressor(codec); + } else { + // return new HeapFakeDirect(codec); + return new IndirectDecompressor(codec); + } + } + + public void close() { + release(); + } + + private class HeapFakeDirect extends DirectBytesDecompressor { + + private final ExposedHeapBytesDecompressor innerCompressor; + + public HeapFakeDirect(CompressionCodec codec){ + innerCompressor = new ExposedHeapBytesDecompressor(codec); + } + + @Override + public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) + throws IOException { + BytesInput uncompressed = decompress(new ByteBufBytesInput(input), uncompressedSize); + output.clear(); + output.setBytes(0, uncompressed.toByteArray()); + output.writerIndex((int) uncompressed.size()); + } + + @Override + public BytesInput decompress(BytesInput paramBytesInput, int uncompressedSize) throws IOException { + return innerCompressor.decompress(paramBytesInput, uncompressedSize); + } + + @Override + protected void release() { + innerCompressor.release(); + } + + } + + private class ExposedHeapBytesDecompressor extends HeapBytesDecompressor { + public ExposedHeapBytesDecompressor(CompressionCodec codec) { + super(codec); + } + + public void release() { + super.release(); + } + } + + public class IndirectDecompressor extends DirectBytesDecompressor { + private final Decompressor decompressor; + + public IndirectDecompressor(CompressionCodec codec) { + this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor(); + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + decompressor.reset(); + byte[] inputBytes = bytes.toByteArray(); + decompressor.setInput(inputBytes, 0, inputBytes.length); + byte[] output = new byte[uncompressedSize]; + decompressor.decompress(output, 0, uncompressedSize); + return BytesInput.from(output); + } + + @Override + public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) + throws IOException { + + decompressor.reset(); + byte[] inputBytes = new byte[input.capacity()]; + input.getBytes(0, inputBytes); + decompressor.setInput(inputBytes, 0, inputBytes.length); + byte[] outputBytes = new byte[uncompressedSize]; + decompressor.decompress(outputBytes, 0, uncompressedSize); + output.clear(); + output.writeBytes(outputBytes); + } + + @Override + protected void release() { + DirectCodecPool.INSTANCE.returnDecompressor(decompressor); + } + } + + public class FullDirectDecompressor extends DirectBytesDecompressor { + private final DirectDecompressor decompressor; + private ByteBuffer compressedBuffer; + private ByteBuffer uncompressedBuffer; + private ExposedHeapBytesDecompressor extraDecompressor; + public FullDirectDecompressor(CompressionCodec codec){ + this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor(); + this.extraDecompressor = new ExposedHeapBytesDecompressor(codec); + } + + @Override + public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException { + + if(false){ + // TODO: fix direct path. (currently, this code is causing issues when writing complex Parquet files. + ByteBuffer bufferIn = compressedBytes.toByteBuffer(); + uncompressedBuffer = ensure(uncompressedBuffer, uncompressedSize); + uncompressedBuffer.clear(); + + if (bufferIn.isDirect()) { + decompressor.decompress(bufferIn, uncompressedBuffer); + } else { + compressedBuffer = ensure(this.compressedBuffer, (int) compressedBytes.size()); + compressedBuffer.clear(); + compressedBuffer.put(bufferIn); + compressedBuffer.flip(); + decompressor.decompress(compressedBuffer, uncompressedBuffer); + } + return BytesInput.from(uncompressedBuffer, 0, uncompressedSize); + + } else { + return extraDecompressor.decompress(compressedBytes, uncompressedSize); + } + + + } + + + @Override + public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) + throws IOException { + output.clear(); + decompressor.decompress(input.nioBuffer(0, compressedSize), output.nioBuffer(0, uncompressedSize)); + output.writerIndex(uncompressedSize); + } + + @Override + protected void release() { + compressedBuffer = DirectCodecFactory.this.release(compressedBuffer); + uncompressedBuffer = DirectCodecFactory.this.release(uncompressedBuffer); + DirectCodecPool.INSTANCE.returnDecompressor(decompressor); + extraDecompressor.release(); + } + + } + + public class NoopDecompressor extends DirectBytesDecompressor { + + @Override + public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) + throws IOException { + Preconditions.checkArgument(compressedSize == uncompressedSize, + "Non-compressed data did not have matching compressed and uncompressed sizes."); + output.clear(); + output.writeBytes(input, compressedSize); + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + return bytes; + } + + @Override + protected void release() { + } + + } + + public class SnappyCompressor extends BytesCompressor { + + private ByteBuffer incoming; + private ByteBuffer outgoing; + + public SnappyCompressor() { + super(); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size()); + ByteBuffer bufferIn = bytes.toByteBuffer(); + outgoing = ensure(outgoing, maxOutputSize); + final int size; + if (bufferIn.isDirect()) { + size = Snappy.compress(bufferIn, outgoing); + } else { + this.incoming = ensure(this.incoming, (int) bytes.size()); + this.incoming.put(bufferIn); + this.incoming.flip(); + size = Snappy.compress(this.incoming, outgoing); + } + + return BytesInput.from(outgoing, 0, (int) size); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.SNAPPY; + } + + @Override + protected void release() { + outgoing = DirectCodecFactory.this.release(outgoing); + incoming = DirectCodecFactory.this.release(incoming); + } + + } + + public static class NoopCompressor extends BytesCompressor { + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + return bytes; + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.UNCOMPRESSED; + } + + @Override + protected void release() { + } + + } + + public static class ByteBufBytesInput extends BytesInput { + private final ByteBuf buf; + private final int length; + + public ByteBufBytesInput(ByteBuf buf) { + this(buf, 0, buf.capacity()); + } + + public ByteBufBytesInput(ByteBuf buf, int offset, int length) { + super(); + if(buf.capacity() == length && offset == 0){ + this.buf = buf; + }else{ + this.buf = buf.slice(offset, length); + } + + this.length = length; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + final WritableByteChannel outputChannel = Channels.newChannel(out); + outputChannel.write(buf.nioBuffer()); + } + + @Override + public ByteBuffer toByteBuffer() throws IOException { + return buf.nioBuffer(); + } + + @Override + public long size() { + return length; + } + } + + + public abstract class DirectBytesDecompressor extends CodecFactory.BytesDecompressor { + public abstract void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) + throws IOException; + } + + + +} + http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java new file mode 100644 index 0000000..26d97c9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import java.util.Collections; +import java.util.Map; + +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressionCodec; +import org.apache.hadoop.io.compress.DirectDecompressor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class DirectCodecPool { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectCodecPool.class); + + public static final DirectCodecPool INSTANCE = new DirectCodecPool(); + + @SuppressWarnings("unchecked") + private final Map<CompressionCodec, CodecPool> codecs = (Map<CompressionCodec, CodecPool>) (Object) Collections.synchronizedMap(Maps.newHashMap()); + + @SuppressWarnings("unchecked") + private final Map<Class<?>, GenericObjectPool> directDePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections + .synchronizedMap(Maps.newHashMap()); + private final Map<Class<?>, GenericObjectPool> dePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections + .synchronizedMap(Maps.newHashMap()); + private final Map<Class<?>, GenericObjectPool> cPools = (Map<Class<?>, GenericObjectPool>) (Object) Collections + .synchronizedMap(Maps.newHashMap()); + + private DirectCodecPool() { + } + + public class CodecPool { + private final GenericObjectPool compressorPool; + private final GenericObjectPool decompressorPool; + private final GenericObjectPool directDecompressorPool; + private final boolean supportDirectDecompressor; + + private CodecPool(final CompressionCodec codec){ + try { + boolean supportDirectDecompressor = codec instanceof DirectDecompressionCodec; + compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { + public Object makeObject() throws Exception { + return codec.createCompressor(); + } + }, Integer.MAX_VALUE); + + Object com = compressorPool.borrowObject(); + if (com != null) { + cPools.put(com.getClass(), compressorPool); + compressorPool.returnObject(com); + }else{ + logger.warn("Unable to find compressor for codec {}", codec.getClass().getName()); + } + + decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { + public Object makeObject() throws Exception { + return codec.createDecompressor(); + } + }, Integer.MAX_VALUE); + + Object decom = decompressorPool.borrowObject(); + if (decom != null) { + dePools.put(decom.getClass(), decompressorPool); + decompressorPool.returnObject(decom); + } else { + logger.warn("Unable to find decompressor for codec {}", codec.getClass().getName()); + } + + if (supportDirectDecompressor) { + directDecompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { + public Object makeObject() throws Exception { + return ((DirectDecompressionCodec) codec).createDirectDecompressor(); + } + }, Integer.MAX_VALUE); + + Object ddecom = directDecompressorPool.borrowObject(); + if (ddecom != null) { + directDePools.put(ddecom.getClass(), directDecompressorPool); + directDecompressorPool.returnObject(ddecom); + + } else { + supportDirectDecompressor = false; + logger.warn("Unable to find direct decompressor for codec {}", codec.getClass().getName()); + } + + } else { + directDecompressorPool = null; + } + + this.supportDirectDecompressor = supportDirectDecompressor; + } catch (Exception e) { + throw new DrillRuntimeException(e); + } + } + + public DirectDecompressor borrowDirectDecompressor(){ + Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec."); + try { + return (DirectDecompressor) directDecompressorPool.borrowObject(); + } catch (Exception e) { + throw new DrillRuntimeException(e); + } + } + + public boolean supportsDirectDecompression() { + return supportDirectDecompressor; + } + + public Decompressor borrowDecompressor(){ + try { + return (Decompressor) decompressorPool.borrowObject(); + } catch (Exception e) { + throw new DrillRuntimeException(e); + } + } + + public Compressor borrowCompressor(){ + try { + return (Compressor) compressorPool.borrowObject(); + } catch (Exception e) { + throw new DrillRuntimeException(e); + } + } + } + + public CodecPool codec(CompressionCodec codec){ + CodecPool pools = codecs.get(codec); + if(pools == null){ + synchronized(this){ + pools = codecs.get(codec); + if(pools == null){ + pools = new CodecPool(codec); + codecs.put(codec, pools); + } + } + } + return pools; + } + + private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) { + try { + GenericObjectPool pool = pools.get(obj.getClass()); + if (pool == null) { + throw new IllegalStateException("Received unexpected decompressor."); + } + pool.returnObject(obj); + } catch (Exception e) { + throw new DrillRuntimeException(e); + } + + } + + public void returnCompressor(Compressor compressor) { + returnToPool(compressor, cPools); + } + + public void returnDecompressor(Decompressor decompressor) { + returnToPool(decompressor, dePools); + } + + public void returnDecompressor(DirectDecompressor decompressor) { + returnToPool(decompressor, directDePools); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java index 45a1dc6..79d1b90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; import parquet.bytes.ByteBufferAllocator; @@ -32,17 +33,21 @@ import parquet.bytes.ByteBufferAllocator; public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class); - private OperatorContext oContext; - private HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>(); + private final BufferAllocator allocator; + private final HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>(); public ParquetDirectByteBufferAllocator(OperatorContext o){ - oContext=o; + allocator = o.getAllocator(); + } + + public ParquetDirectByteBufferAllocator(BufferAllocator allocator) { + this.allocator = allocator; } @Override public ByteBuffer allocate(int sz) { - ByteBuf bb = oContext.getAllocator().buffer(sz); + ByteBuf bb = allocator.buffer(sz); ByteBuffer b = bb.nioBuffer(0, sz); allocatedBuffers.put(System.identityHashCode(b), bb); logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b)); http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index cfa4c93..322a88d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; import com.google.common.collect.ImmutableSet; @@ -74,7 +73,6 @@ public class ParquetFormatPlugin implements FormatPlugin{ private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC)); private final DrillbitContext context; - private final CodecFactoryExposer codecFactoryExposer; private final Configuration fsConf; private final ParquetFormatMatcher formatMatcher; private final ParquetFormatConfig config; @@ -89,7 +87,6 @@ public class ParquetFormatPlugin implements FormatPlugin{ public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){ this.context = context; - this.codecFactoryExposer = new CodecFactoryExposer(fsConf); this.config = formatConfig; this.formatMatcher = new ParquetFormatMatcher(this); this.storageConfig = storageConfig; @@ -171,10 +168,6 @@ public class ParquetFormatPlugin implements FormatPlugin{ return storageConfig; } - public CodecFactoryExposer getCodecFactoryExposer() { - return codecFactoryExposer; - } - public String getName(){ return name; } http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 3506ffa..8615eb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -80,6 +80,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private boolean validating = false; private CompressionCodecName codec = CompressionCodecName.SNAPPY; private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; + private DirectCodecFactory codecFactory; private long recordCount = 0; private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; @@ -100,6 +101,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{ super(); this.oContext = context.newOperatorContext(writer, true); + this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator()); } @Override @@ -156,10 +158,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5); pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext, - codec, - pageSize, - this.schema, - initialBlockBufferSize); + codecFactory.getCompressor(codec, pageSize), + schema, + initialBlockBufferSize); int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema); @@ -332,6 +333,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { ColumnChunkPageWriteStoreExposer.close(pageStore); } + codecFactory.close(); + if (!hasRecords) { // the very last file is empty, delete it (DRILL-2408) Path path = getPath(); http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index d5586ce..d5b7303 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.store.parquet2.DrillParquetReader; -import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -130,7 +129,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan readers.add( new ParquetRecordReader( context, e.getPath(), e.getRowGroupIndex(), fs, - rowGroupScan.getStorageEngine().getCodecFactoryExposer(), + new DirectCodecFactory(fs.getConf(), oContext.getAllocator()), footers.get(e.getPath()), rowGroupScan.getColumns() ) http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 6a41a04..28a8b23 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -28,6 +28,9 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.store.parquet.ColumnDataReader; +import org.apache.drill.exec.store.parquet.DirectCodecFactory; +import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput; +import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; import org.apache.drill.exec.store.parquet.ParquetFormatPlugin; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -45,8 +48,6 @@ import parquet.format.PageHeader; import parquet.format.PageType; import parquet.format.Util; import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.CodecFactoryExposer; -import parquet.hadoop.CodecFactoryExposer.HadoopByteBufBytesInput; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.CompressionCodecName; import parquet.schema.PrimitiveType; @@ -101,13 +102,13 @@ final class PageReader { // These need to be held throughout reading of the entire column chunk List<ByteBuf> allocatedDictionaryBuffers; - private final CodecFactoryExposer codecFactory; + private final DirectCodecFactory codecFactory; PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ this.parentColumnReader = parentStatus; allocatedDictionaryBuffers = new ArrayList<ByteBuf>(); - codecFactory = parentColumnReader.parentReader.getCodecFactoryExposer(); + codecFactory = parentColumnReader.parentReader.getCodecFactory(); long start = columnChunkMetaData.getFirstDataPageOffset(); try { @@ -137,10 +138,12 @@ final class PageReader { final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size); try { dataReader.loadPage(compressedData, pageHeader.compressed_page_size); - codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(), + DirectBytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData + .getCodec()); + decompressor.decompress( compressedData, - dictionaryData, pageHeader.compressed_page_size, + dictionaryData, pageHeader.getUncompressed_page_size()); } finally { @@ -149,7 +152,7 @@ final class PageReader { } DictionaryPage page = new DictionaryPage( - getBytesInput(dictionaryData), + asBytesInput(dictionaryData), pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values, parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) @@ -158,9 +161,8 @@ final class PageReader { } } - public static BytesInput getBytesInput(DrillBuf uncompressedByteBuf) throws IOException { - final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedByteBuf.capacity()); - return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit()); + public static BytesInput asBytesInput(DrillBuf buf) throws IOException { + return new ByteBufBytesInput(buf); } /** @@ -197,17 +199,17 @@ final class PageReader { final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size); try{ dataReader.loadPage(compressedData, pageHeader.compressed_page_size); - codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(), + codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress( compressedData, - uncompressedData, pageHeader.compressed_page_size, + uncompressedData, pageHeader.getUncompressed_page_size()); } finally { compressedData.release(); } } DictionaryPage page = new DictionaryPage( - getBytesInput(uncompressedData), + asBytesInput(uncompressedData), pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values, parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) @@ -225,10 +227,10 @@ final class PageReader { }else{ final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size); dataReader.loadPage(compressedData, pageHeader.compressed_page_size); - codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(), + codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress( compressedData, - pageData, pageHeader.compressed_page_size, + pageData, pageHeader.getUncompressed_page_size()); compressedData.release(); } http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 11d0042..2072aae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -30,7 +30,6 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -39,8 +38,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.drill.exec.vector.ValueVector; @@ -51,7 +50,6 @@ import parquet.column.ColumnDescriptor; import parquet.format.FileMetaData; import parquet.format.SchemaElement; import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; @@ -103,36 +101,41 @@ public class ParquetRecordReader extends AbstractRecordReader { // records specified in the row group metadata long mockRecordsRead; - private final CodecFactoryExposer codecFactoryExposer; + private final DirectCodecFactory codecFactory; int rowGroupIndex; long totalRecordsRead; - public ParquetRecordReader(FragmentContext fragmentContext, // - String path, // - int rowGroupIndex, // - FileSystem fs, // - CodecFactoryExposer codecFactoryExposer, // - ParquetMetadata footer, // + public ParquetRecordReader(FragmentContext fragmentContext, + String path, + int rowGroupIndex, + FileSystem fs, + DirectCodecFactory codecFactory, + ParquetMetadata footer, List<SchemaPath> columns) throws ExecutionSetupException { - this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer, + this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer, columns); } - public ParquetRecordReader(FragmentContext fragmentContext, long batchSize, - String path, int rowGroupIndex, FileSystem fs, - CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, - List<SchemaPath> columns) throws ExecutionSetupException { + public ParquetRecordReader( + FragmentContext fragmentContext, + long batchSize, + String path, + int rowGroupIndex, + FileSystem fs, + DirectCodecFactory codecFactory, + ParquetMetadata footer, + List<SchemaPath> columns) throws ExecutionSetupException { this.hadoopPath = new Path(path); this.fileSystem = fs; - this.codecFactoryExposer = codecFactoryExposer; + this.codecFactory = codecFactory; this.rowGroupIndex = rowGroupIndex; this.batchSize = batchSize; this.footer = footer; setColumns(columns); } - public CodecFactoryExposer getCodecFactoryExposer() { - return codecFactoryExposer; + public DirectCodecFactory getCodecFactory() { + return codecFactory; } public Path getHadoopPath() { @@ -452,6 +455,8 @@ public class ParquetRecordReader extends AbstractRecordReader { } columnStatuses.clear(); + codecFactory.close(); + for (VarLengthColumn r : varLengthReader.columns) { r.clear(); } http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 921d134..07950df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -18,16 +18,14 @@ package org.apache.drill.exec.store.parquet2; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Collection; -import java.util.ArrayList; -import java.util.Map; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; -import com.google.common.collect.Sets; - import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; @@ -43,37 +41,30 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import parquet.column.ColumnDescriptor; import parquet.common.schema.ColumnPath; -import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ColumnChunkIncReadStore; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.io.ColumnIOFactory; -import parquet.io.InvalidRecordException; import parquet.io.MessageColumnIO; import parquet.schema.GroupType; import parquet.schema.MessageType; import parquet.schema.Type; -import parquet.schema.PrimitiveType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; - -import parquet.schema.Types; +import com.google.common.collect.Sets; public class DrillParquetReader extends AbstractRecordReader { @@ -247,7 +238,6 @@ public class DrillParquetReader extends AbstractRecordReader { paths.put(md.getPath(), md); } - CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(fileSystem.getConf()); Path filePath = new Path(entry.getPath()); BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex()); @@ -255,7 +245,8 @@ public class DrillParquetReader extends AbstractRecordReader { recordCount = (int) blockMetaData.getRowCount(); pageReadStore = new ColumnChunkIncReadStore(recordCount, - codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fileSystem, filePath); + new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(), + fileSystem, filePath); for (String[] path : schema.getPaths()) { Type type = schema.getType(path); http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java deleted file mode 100644 index 5438660..0000000 --- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DirectDecompressionCodec; -import org.apache.hadoop.io.compress.DirectDecompressor; -import org.apache.hadoop.util.ReflectionUtils; - -import parquet.bytes.BytesInput; -import parquet.hadoop.metadata.CompressionCodecName; - -public class CodecFactoryExposer{ - - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodecFactoryExposer.class); - - private CodecFactory codecFactory; - private final Map<String, org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>(); - private Configuration configuration; - - public CodecFactoryExposer(Configuration config){ - codecFactory = new CodecFactory(config);configuration = config; - } - - public CodecFactory getCodecFactory() { - return codecFactory; - } - - public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException { - return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize); - } - - public static BytesInput getBytesInput(ByteBuf uncompressedByteBuf, int uncompressedSize) throws IOException { - ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize); - return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit()); - } - - public void decompress(CompressionCodecName codecName, - final DrillBuf compressedByteBuf, - final DrillBuf uncompressedByteBuf, - int compressedSize, - int uncompressedSize) throws IOException { - final ByteBuffer inpBuffer = compressedByteBuf.nioBuffer(0, compressedSize); - final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedSize); - CompressionCodec c = getCodec(codecName); - //TODO: Create the decompressor only once at init time. - Class<?> cx = c.getClass(); - - DirectDecompressionCodec d=null; - DirectDecompressor decompr=null; - - if (DirectDecompressionCodec.class.isAssignableFrom(cx)) { - d=(DirectDecompressionCodec)c; - } - - if(d!=null) { - decompr = d.createDirectDecompressor(); - } - - if(d!=null && decompr!=null){ - decompr.decompress(inpBuffer, outBuffer); - }else{ - logger.warn("This Hadoop implementation does not support a " + codecName + - " direct decompression codec interface. "+ - "Direct decompression is available only on *nix systems with Hadoop 2.3 or greater. "+ - "Read operations will be a little slower. "); - BytesInput outBytesInp = this.decompress( - new HadoopByteBufBytesInput(inpBuffer, 0, inpBuffer.limit()), - uncompressedSize, - codecName); - // COPY the data back into the output buffer. - // (DrillBufs can only refer to direct memory, so we cannot pass back a BytesInput backed - // by a byte array). - outBuffer.put(outBytesInp.toByteArray()); - } - } - - private DirectDecompressionCodec getCodec(CompressionCodecName codecName) { - String codecClassName = codecName.getHadoopCompressionCodecClassName(); - if (codecClassName == null) { - return null; - } - DirectDecompressionCodec codec = codecByName.get(codecClassName); - if (codec != null) { - return codec; - } - - try { - Class<?> codecClass = Class.forName(codecClassName); - codec = (DirectDecompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); - codecByName.put(codecClassName, codec); - return codec; - } catch (ClassNotFoundException e) { - throw new BadConfigurationException("Class " + codecClassName + " was not found", e); - } - } - - public static class HadoopByteBufBytesInput extends BytesInput { - - private final ByteBuffer byteBuf; - private final int length; - private final int offset; - - public HadoopByteBufBytesInput(ByteBuffer byteBuf, int offset, int length) { - super(); - this.byteBuf = byteBuf; - this.offset = offset; - this.length = length; - } - - @Override - public void writeAllTo(OutputStream out) throws IOException { - final WritableByteChannel outputChannel = Channels.newChannel(out); - byteBuf.position(offset); - ByteBuffer tempBuf = byteBuf.slice(); - tempBuf.limit(length); - outputChannel.write(tempBuf); - } - - @Override - public ByteBuffer toByteBuffer() throws IOException { - byteBuf.position(offset); - ByteBuffer buf = byteBuf.slice(); - buf.limit(length); - return buf; - } - - @Override - public long size() { - return length; - } - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java index 242cd28..6337d4c 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.hadoop.conf.Configuration; +import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,14 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore { private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - private CodecFactory codecFactory = new CodecFactory(new Configuration()); + private DirectCodecFactory codecFactory; private BufferAllocator allocator; private FileSystem fs; private Path path; private long rowCount; private List<FSDataInputStream> streams = new ArrayList(); - public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) { + public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator, + FileSystem fs, Path path) { this.codecFactory = codecFactory; this.allocator = allocator; this.fs = fs; http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java index 0e9dec0..743d185 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java @@ -21,21 +21,19 @@ import java.io.IOException; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; -import org.apache.hadoop.conf.Configuration; import parquet.column.page.PageWriteStore; import parquet.hadoop.CodecFactory.BytesCompressor; -import parquet.hadoop.metadata.CompressionCodecName; import parquet.schema.MessageType; public class ColumnChunkPageWriteStoreExposer { - public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(OperatorContext oContext, - CompressionCodecName codec, - int pageSize, - MessageType schema, - int initialSize) { - BytesCompressor compressor = new CodecFactory(new Configuration()).getCompressor(codec, pageSize); + public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore( + OperatorContext oContext, + BytesCompressor compressor, + MessageType schema, + int initialSize + ) { return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext)); } http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java new file mode 100644 index 0000000..644144e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import io.netty.buffer.DrillBuf; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.drill.common.DeferredException; +import org.apache.drill.exec.ExecTest; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.store.parquet.DirectCodecFactory; +import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput; +import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import parquet.bytes.BytesInput; +import parquet.hadoop.CodecFactory.BytesCompressor; +import parquet.hadoop.metadata.CompressionCodecName; + +public class TestDirectCodecFactory extends ExecTest { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class); + + private static enum Decompression { + ON_HEAP, OFF_HEAP, DRILLBUF + } + + private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) { + DrillBuf rawBuf = null; + DrillBuf outBuf = null; + try (BufferAllocator allocator = new TopLevelAllocator(); + DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) { + try { + rawBuf = allocator.buffer(size); + final byte[] rawArr = new byte[size]; + outBuf = allocator.buffer(size * 2); + Random r = new Random(); + byte[] random = new byte[1024]; + int pos = 0; + while (pos < size) { + r.nextBytes(random); + rawBuf.writeBytes(random); + System.arraycopy(random, 0, rawArr, pos, random.length); + pos += random.length; + } + + BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024); + DirectBytesDecompressor d = codecFactory.getDecompressor(codec); + + BytesInput compressed; + if (useOnHeapCompression) { + compressed = c.compress(BytesInput.from(rawArr)); + } else { + compressed = c.compress(new ByteBufBytesInput(rawBuf)); + } + + switch (decomp) { + case DRILLBUF: { + ByteBuffer buf = compressed.toByteBuffer(); + DrillBuf b = allocator.buffer(buf.capacity()); + try { + b.writeBytes(buf); + d.decompress(b, (int) compressed.size(), outBuf, size); + for (int i = 0; i < size; i++) { + Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i)); + } + } finally { + b.release(); + } + break; + } + + case OFF_HEAP: { + ByteBuffer buf = compressed.toByteBuffer(); + DrillBuf b = allocator.buffer(buf.capacity()); + try { + b.writeBytes(buf); + BytesInput input = d.decompress(new ByteBufBytesInput(b), size); + Assert.assertArrayEquals(input.toByteArray(), rawArr); + } finally { + b.release(); + } + break; + } + case ON_HEAP: { + byte[] buf = compressed.toByteArray(); + BytesInput input = d.decompress(BytesInput.from(buf), size); + Assert.assertArrayEquals(input.toByteArray(), rawArr); + break; + } + } + } catch (Exception e) { + String msg = String.format( + "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d", + codec.name(), + useOnHeapCompression, decomp.name(), size); + System.out.println(msg); + throw new RuntimeException(msg, e); + } finally { + if (rawBuf != null) { + rawBuf.release(); + } + if (outBuf != null) { + outBuf.release(); + } + } + } + } + + @Test + public void compressionCodecs() throws Exception { + int[] sizes = { 4 * 1024, 1 * 1024 * 1024 }; + boolean[] comp = { true, false }; + + try (DeferredException ex = new DeferredException()) { + for (int size : sizes) { + for (boolean useOnHeapComp : comp) { + for (Decompression decomp : Decompression.values()) { + for (CompressionCodecName codec : CompressionCodecName.values()) { + if (codec == CompressionCodecName.LZO) { + // not installed as gpl. + continue; + } + try { + test(size, codec, useOnHeapComp, decomp); + } catch (Exception e) { + ex.addException(e); + } + } + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index e50e3fb..83a1cb8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -70,7 +70,6 @@ import parquet.bytes.BytesInput; import parquet.column.page.DataPageV1; import parquet.column.page.PageReadStore; import parquet.column.page.PageReader; -import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.Footer; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; @@ -625,7 +624,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ BufferAllocator allocator = new TopLevelAllocator(); for(int i = 0; i < 25; i++) { ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs, - new CodecFactoryExposer(dfsConfig), f.getParquetMetadata(), columns); + new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns); TestOutputMutator mutator = new TestOutputMutator(allocator); rr.setup(mutator); Stopwatch watch = new Stopwatch(); http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f0f4bc5..a207f2a 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path> <dep.junit.version>4.11</dep.junit.version> <dep.slf4j.version>1.7.6</dep.slf4j.version> - <parquet.version>1.6.0rc3-drill-r0.1</parquet.version> + <parquet.version>1.6.0rc3-drill-r0.3</parquet.version> </properties> <scm>