DRILL-1368 - Fix to read Parquet Gzip files on systems
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ef254649 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ef254649 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ef254649 Branch: refs/heads/master Commit: ef254649f89617e792a45a3c66d9340b82efa048 Parents: e12ad40 Author: Parth Chandra <[email protected]> Authored: Mon Sep 29 17:34:07 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Mon Sep 29 18:21:46 2014 -0700 ---------------------------------------------------------------------- .../parquet/hadoop/CodecFactoryExposer.java | 33 ++++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef254649/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java index d6584bb..4d107e4 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java @@ -31,6 +31,7 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DirectDecompressionCodec; +import org.apache.hadoop.io.compress.DirectDecompressor; import org.apache.hadoop.util.ReflectionUtils; import parquet.bytes.BytesInput; @@ -38,6 +39,8 @@ import parquet.hadoop.metadata.CompressionCodecName; public class CodecFactoryExposer{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodecFactoryExposer.class); + private CodecFactory codecFactory; private final Map<String, org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>(); private Configuration configuration; @@ -67,17 +70,35 @@ public class CodecFactoryExposer{ ByteBuffer inpBuffer=compressedByteBuf.nioBuffer(0, compressedSize); ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize); CompressionCodec c = getCodec(codecName); - + //TODO: Create the decompressor only once at init time. Class<?> cx = c.getClass(); - ClassLoader l = cx.getClassLoader(); - Class<?>[] inf = cx.getInterfaces(); - DirectDecompressionCodec d = (DirectDecompressionCodec)c; + DirectDecompressionCodec d=null; + DirectDecompressor decompr=null; + + if (DirectDecompressionCodec.class.isAssignableFrom(cx)) { + d=(DirectDecompressionCodec)c; + } if(d!=null) { - d.createDirectDecompressor().decompress(inpBuffer, outBuffer); + decompr = d.createDirectDecompressor(); + } + + if(d!=null && decompr!=null){ + decompr.decompress(inpBuffer, outBuffer); }else{ - throw new DrillRuntimeException("Cannot create a decompression codec for codec "+codecName.name()); + logger.warn("This Hadoop implementation does not support a " + codecName + + " direct decompression codec interface. "+ + "Direct decompression is available only on *nix systems with Hadoop 2.3 or greater. "+ + "Read operations will be a little slower. "); + BytesInput outBytesInp = this.decompress( + new HadoopByteBufBytesInput(inpBuffer, 0, inpBuffer.limit()), + uncompressedSize, + codecName); + // COPY the data back into the output buffer. + // (DrillBufs can only refer to direct memory, so we cannot pass back a BytesInput backed + // by a byte array). + outBuffer.put(outBytesInp.toByteArray()); } return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit()); }
