This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 7d33bf1 DRILL-5674: Support ZIP compression 7d33bf1 is described below commit 7d33bf102e50089536b2c7c95d08ffb58aebf78e Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Fri Oct 18 18:22:15 2019 +0300 DRILL-5674: Support ZIP compression 1. Added ZipCodec implementation which can read / write single file. 2. Revisited Drill plugin formats to ensure 'openPossiblyCompressedStream' method is used in those which support compression. 3. Added unit tests. 4. General refactoring. --- .../drill/exec/store/ltsv/LTSVFormatPlugin.java | 15 +-- .../drill/exec/store/ltsv/LTSVRecordReader.java | 32 ++--- .../exec/store/syslog/SyslogFormatPlugin.java | 20 ++- .../exec/store/syslog/SyslogRecordReader.java | 59 +++------ .../planner/logical/partition/PruneScanRule.java | 2 +- .../drill/exec/store/dfs/BasicFormatMatcher.java | 65 +++++----- .../apache/drill/exec/store/dfs/FileSelection.java | 71 +++++------ .../drill/exec/store/dfs/FileSystemPlugin.java | 43 ++++++- .../drill/exec/store/dfs/FormatSelection.java | 11 +- .../org/apache/drill/exec/store/dfs/ZipCodec.java | 141 +++++++++++++++++++++ .../drill/exec/store/log/LogBatchReader.java | 2 +- .../exec/store/parquet/ParquetFormatPlugin.java | 12 +- .../drill/exec/store/pcap/PcapBatchReader.java | 27 +--- .../drill/exec/store/pcap/PcapFormatPlugin.java | 11 +- .../exec/store/pcapng/PcapngFormatPlugin.java | 2 +- .../exec/store/pcapng/PcapngRecordReader.java | 12 +- .../drill/exec/store/pcapng/package-info.java | 2 +- .../drill/exec/store/dfs/TestCompressedFiles.java | 111 ++++++++++++++++ 18 files changed, 431 insertions(+), 207 deletions(-) diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java index 8ff62ed..7284409 100644 --- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java +++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.ltsv; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; @@ -34,17 +33,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import java.io.IOException; import java.util.List; public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> { - private static final boolean IS_COMPRESSIBLE = false; + private static final boolean IS_COMPRESSIBLE = true; private static final String DEFAULT_NAME = "ltsv"; - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LTSVFormatPlugin.class); - public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { this(name, context, fsConf, storageConfig, new LTSVFormatPluginConfig()); } @@ -54,7 +50,7 @@ public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> { } @Override - public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException { + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) { return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns); } @@ -75,7 +71,7 @@ public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> { } @Override - public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException { + public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) { throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files."); } @@ -85,13 +81,12 @@ public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> { } @Override - public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { throw new UnsupportedOperationException("unimplemented"); } @Override - public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { + public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) { throw new UnsupportedOperationException("unimplemented"); } - } diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java index cb23850..619ceb1 100644 --- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java +++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.ltsv; import io.netty.buffer.DrillBuf; import org.apache.drill.common.AutoCloseables; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.OutOfMemoryException; @@ -30,11 +29,13 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.text.ParseException; @@ -46,13 +47,13 @@ import java.util.Set; public class LTSVRecordReader extends AbstractRecordReader { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LTSVRecordReader.class); + private static final Logger logger = LoggerFactory.getLogger(LTSVRecordReader.class); private static final int MAX_RECORDS_PER_BATCH = 8096; private final String inputPath; - private final FSDataInputStream fsStream; + private final InputStream fsStream; private final BufferedReader reader; @@ -64,14 +65,14 @@ public class LTSVRecordReader extends AbstractRecordReader { List<SchemaPath> columns) throws OutOfMemoryException { this.inputPath = path.toUri().getPath(); try { - this.fsStream = fileSystem.open(path); - this.reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), StandardCharsets.UTF_8)); + this.fsStream = fileSystem.openPossiblyCompressedStream(path); + this.reader = new BufferedReader(new InputStreamReader(fsStream, StandardCharsets.UTF_8)); this.buffer = fragmentContext.getManagedBuffer(); setColumns(columns); - } catch (IOException e) { - String msg = String.format("Failed to open input file: %s", inputPath); - throw UserException.dataReadError(e).message(msg).build(logger); + throw UserException.dataReadError(e) + .message(String.format("Failed to open input file: %s", inputPath)) + .build(logger); } } @@ -79,16 +80,14 @@ public class LTSVRecordReader extends AbstractRecordReader { protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) { Set<SchemaPath> transformed = new LinkedHashSet<>(); if (!isStarQuery()) { - for (SchemaPath column : projected) { - transformed.add(column); - } + transformed.addAll(projected); } else { transformed.add(SchemaPath.STAR_COLUMN); } return transformed; } - public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + public void setup(final OperatorContext context, final OutputMutator output) { this.writer = new VectorContainerWriter(output); } @@ -100,7 +99,7 @@ public class LTSVRecordReader extends AbstractRecordReader { try { BaseWriter.MapWriter map = this.writer.rootAsMap(); - String line = null; + String line; while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) { // Skip empty lines @@ -145,7 +144,9 @@ public class LTSVRecordReader extends AbstractRecordReader { } catch (final Exception e) { String msg = String.format("Failure while reading messages from %s. Record reader was at record: %d", inputPath, recordCount + 1); - throw UserException.dataReadError(e).message(msg).build(logger); + throw UserException.dataReadError(e) + .message(msg) + .build(logger); } } @@ -161,5 +162,4 @@ public class LTSVRecordReader extends AbstractRecordReader { public void close() throws Exception { AutoCloseables.close(reader, fsStream); } - } diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java index 6f81ac6..d21035b 100644 --- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java +++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java @@ -17,13 +17,12 @@ */ package org.apache.drill.exec.store.syslog; -import java.io.IOException; -import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.common.exceptions.ExecutionSetupException; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; @@ -31,14 +30,13 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; -import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; - - -import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import java.util.List; + public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> { public static final String DEFAULT_NAME = "syslog"; @@ -59,7 +57,7 @@ public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> { @Override public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, - List<SchemaPath> columns, String userName) throws ExecutionSetupException { + List<SchemaPath> columns, String userName) { return new SyslogRecordReader(context, dfs, fileWork, columns, userName, formatConfig); } @@ -90,12 +88,12 @@ public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> { } @Override - public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { throw new UnsupportedOperationException("unimplemented"); } @Override - public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { + public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) { throw new UnsupportedOperationException("unimplemented"); } } diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java index 4b2831c..0f39887 100644 --- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java +++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.syslog; import io.netty.buffer.DrillBuf; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.OutOfMemoryException; @@ -34,18 +33,20 @@ import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.realityforge.jsyslog.message.StructuredDataParameter; import org.realityforge.jsyslog.message.SyslogMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.List; import java.util.Map; -import java.util.Iterator; public class SyslogRecordReader extends AbstractRecordReader { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SyslogRecordReader.class); + private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class); private static final int MAX_RECORDS_PER_BATCH = 4096; private final DrillFileSystem fileSystem; @@ -54,16 +55,13 @@ public class SyslogRecordReader extends AbstractRecordReader { private BufferedReader reader; private DrillBuf buffer; private VectorContainerWriter writer; - private SyslogFormatConfig config; - private int maxErrors; - private boolean flattenStructuredData; + private final int maxErrors; + private final boolean flattenStructuredData; private int errorCount; private int lineCount; - private List<SchemaPath> projectedColumns; + private final List<SchemaPath> projectedColumns; private String line; - private SimpleDateFormat df; - public SyslogRecordReader(FragmentContext context, DrillFileSystem fileSystem, FileWork fileWork, @@ -74,9 +72,7 @@ public class SyslogRecordReader extends AbstractRecordReader { this.fileSystem = fileSystem; this.fileWork = fileWork; this.userName = userName; - this.config = config; this.maxErrors = config.getMaxErrors(); - this.df = getValidDateObject("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); this.errorCount = 0; this.buffer = context.getManagedBuffer().reallocIfNeeded(4096); this.projectedColumns = columns; @@ -86,7 +82,7 @@ public class SyslogRecordReader extends AbstractRecordReader { } @Override - public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + public void setup(final OperatorContext context, final OutputMutator output) { openFile(); this.writer = new VectorContainerWriter(output); } @@ -94,7 +90,7 @@ public class SyslogRecordReader extends AbstractRecordReader { private void openFile() { InputStream in; try { - in = fileSystem.open(fileWork.getPath()); + in = fileSystem.openPossiblyCompressedStream(fileWork.getPath()); } catch (Exception e) { throw UserException .dataReadError(e) @@ -115,7 +111,7 @@ public class SyslogRecordReader extends AbstractRecordReader { try { BaseWriter.MapWriter map = this.writer.rootAsMap(); - String line = null; + String line; while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) { lineCount++; @@ -288,7 +284,7 @@ public class SyslogRecordReader extends AbstractRecordReader { return; } try { - byte[] bytes = value.getBytes("UTF-8"); + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); int stringLength = bytes.length; this.buffer = buffer.reallocIfNeeded(stringLength); this.buffer.setBytes(0, bytes, 0, stringLength); @@ -304,18 +300,10 @@ public class SyslogRecordReader extends AbstractRecordReader { //Helper function to flatten structured data private void mapFlattenedStructuredData(Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) { - Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = data.entrySet().iterator(); - while (entries.hasNext()) { - Map.Entry<String, List<StructuredDataParameter>> entry = entries.next(); - - List<StructuredDataParameter> dataParameters = entry.getValue(); - String fieldName; - String fieldValue; - - for (StructuredDataParameter parameter : dataParameters) { - fieldName = "structured_data_" + parameter.getName(); - fieldValue = parameter.getValue(); - + for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) { + for (StructuredDataParameter parameter : entry.getValue()) { + String fieldName = "structured_data_" + parameter.getName(); + String fieldValue = parameter.getValue(); mapStringField(fieldName, fieldValue, map); } } @@ -323,28 +311,19 @@ public class SyslogRecordReader extends AbstractRecordReader { //Gets field from the Structured Data Construct private String getFieldFromStructuredData(String fieldName, SyslogMessage parsedMessage) { - String result = null; - Map<String, List<StructuredDataParameter>> structuredData = parsedMessage.getStructuredData(); - Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = parsedMessage.getStructuredData().entrySet().iterator(); - while (entries.hasNext()) { - Map.Entry<String, List<StructuredDataParameter>> entry = entries.next(); - List<StructuredDataParameter> dataParameters = entry.getValue(); - - for (StructuredDataParameter d : dataParameters) { + for (Map.Entry<String, List<StructuredDataParameter>> entry : parsedMessage.getStructuredData().entrySet()) { + for (StructuredDataParameter d : entry.getValue()) { if (d.getName().equals(fieldName)) { return d.getValue(); } } } - return result; + return null; } //Helper function to map arrays private void mapComplexField(String mapName, Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) { - Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = data.entrySet().iterator(); - while (entries.hasNext()) { - Map.Entry<String, List<StructuredDataParameter>> entry = entries.next(); - + for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) { List<StructuredDataParameter> dataParameters = entry.getValue(); String fieldName; String fieldValue; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index c3470d8..b72ecee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -544,7 +544,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { if (scan instanceof EnumerableTableScan) { final Object selection = DrillRelOptUtil.getDrillTable(scan).getSelection(); if (selection instanceof FormatSelection - && ((FormatSelection)selection).supportDirPruning()) { + && ((FormatSelection)selection).supportsDirPruning()) { return true; // Do directory-based pruning in Calcite logical } else { return false; // Do not do directory-based pruning in Calcite logical diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java index 6fa1793..78cb4e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java @@ -22,34 +22,30 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.shaded.guava.com.google.common.collect.Range; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Range; - public class BasicFormatMatcher extends FormatMatcher { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class); protected final FormatPlugin plugin; - protected final boolean compressible; - protected final CompressionCodecFactory codecFactory; + private final boolean compressible; + private final CompressionCodecFactory codecFactory; private final List<Pattern> patterns; private final MagicStringMatcher matcher; public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) { - super(); - this.patterns = ImmutableList.copyOf(patterns); + this.patterns = new ArrayList<>(patterns); this.matcher = new MagicStringMatcher(magicStrings); this.plugin = plugin; this.compressible = false; @@ -57,12 +53,10 @@ public class BasicFormatMatcher extends FormatMatcher { } public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) { - List<Pattern> patterns = Lists.newArrayList(); - for (String extension : extensions) { - patterns.add(Pattern.compile(".*\\." + extension)); - } - this.patterns = patterns; - this.matcher = new MagicStringMatcher(new ArrayList<MagicString>()); + this.patterns = extensions.stream() + .map(extension -> Pattern.compile(".*\\." + extension)) + .collect(Collectors.toList()); + this.matcher = new MagicStringMatcher(new ArrayList<>()); this.plugin = plugin; this.compressible = compressible; this.codecFactory = new CompressionCodecFactory(fsConf); @@ -84,8 +78,12 @@ public class BasicFormatMatcher extends FormatMatcher { return null; } - /* - * Function returns true if the file extension matches the pattern + /** + * Function returns true if the file extension matches the pattern. + * + * @param fs file system + * @param status file status + * @return true if file is readable, false otherwise */ @Override public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException { @@ -109,10 +107,7 @@ public class BasicFormatMatcher extends FormatMatcher { } } - if (matcher.matches(fs, status)) { - return true; - } - return false; + return matcher.matches(fs, status); } @Override @@ -121,16 +116,14 @@ public class BasicFormatMatcher extends FormatMatcher { return plugin; } + private static class MagicStringMatcher { - private class MagicStringMatcher { - - private List<RangeMagics> ranges; + private final List<RangeMagics> ranges; - public MagicStringMatcher(List<MagicString> magicStrings) { - ranges = Lists.newArrayList(); - for(MagicString ms : magicStrings) { - ranges.add(new RangeMagics(ms)); - } + MagicStringMatcher(List<MagicString> magicStrings) { + this.ranges = magicStrings.stream() + .map(RangeMagics::new) + .collect(Collectors.toList()); } public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{ @@ -168,15 +161,15 @@ public class BasicFormatMatcher extends FormatMatcher { return false; } - private class RangeMagics{ - Range<Long> range; - byte[][] magics; + private static class RangeMagics { + + private final Range<Long> range; + private final byte[][] magics; - public RangeMagics(MagicString ms) { - this.range = Range.closedOpen( ms.getOffset(), (long) ms.getBytes().length); + RangeMagics(MagicString ms) { + this.range = Range.closedOpen(ms.getOffset(), (long) ms.getBytes().length); this.magics = new byte[][]{ms.getBytes()}; } } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 902abda..a1fccb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -17,6 +17,16 @@ */ package org.apache.drill.exec.store.dfs; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.util.DrillFileSystemUtil; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -26,20 +36,12 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.util.DrillFileSystemUtil; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; - /** * Jackson serializable description of a file selection. */ public class FileSelection { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSelection.class); + + private static final Logger logger = LoggerFactory.getLogger(FileSelection.class); private static final String WILD_CARD = "*"; private List<FileStatus> statuses; @@ -73,10 +75,10 @@ public class FileSelection { } private StatusType dirStatus; - // whether this selection previously had a wildcard - private boolean hadWildcard = false; - // whether all partitions were previously pruned for this selection - private boolean wasAllPartitionsPruned = false; + // whether this selection previously had a wildcard, false by default + private boolean hadWildcard; + // whether all partitions were previously pruned for this selection, false by default + private boolean wasAllPartitionsPruned; /** * Creates a {@link FileSelection selection} out of given file statuses/files and selection root. @@ -108,7 +110,7 @@ public class FileSelection { * Copy constructor for convenience. */ protected FileSelection(FileSelection selection) { - Preconditions.checkNotNull(selection, "selection cannot be null"); + Preconditions.checkNotNull(selection, "File selection cannot be null"); this.statuses = selection.statuses; this.files = selection.files; this.selectionRoot = selection.selectionRoot; @@ -117,6 +119,7 @@ public class FileSelection { this.metaContext = selection.metaContext; this.hadWildcard = selection.hadWildcard; this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned; + this.emptyDirectory = selection.emptyDirectory; } public Path getSelectionRoot() { @@ -127,8 +130,8 @@ public class FileSelection { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; if (statuses == null) { - List<FileStatus> newStatuses = Lists.newArrayList(); - for (Path pathStr : files) { + List<FileStatus> newStatuses = new ArrayList<>(); + for (Path pathStr : Objects.requireNonNull(files, "Files can not be null if statuses are null")) { newStatuses.add(fs.getFileStatus(pathStr)); } statuses = newStatuses; @@ -144,11 +147,9 @@ public class FileSelection { public List<Path> getFiles() { if (files == null) { - List<Path> newFiles = Lists.newArrayList(); - for (FileStatus status:statuses) { - newFiles.add(status.getPath()); - } - files = newFiles; + files = Objects.requireNonNull(statuses, "Statuses can not be null if files are null").stream() + .map(FileStatus::getPath) + .collect(Collectors.toList()); } return files; } @@ -386,9 +387,8 @@ public class FileSelection { Preconditions.checkArgument(!combinedPath.isEmpty(), "Empty path (" + combinedPath + "( in file selection path."); if (!combinedPath.startsWith(parent)) { - StringBuilder msg = new StringBuilder(); - msg.append("Invalid path : ").append(subpath).append(" takes you outside the workspace."); - throw new IllegalArgumentException(msg.toString()); + throw new IllegalArgumentException( + String.format("Invalid path [%s] takes you outside the workspace.", subpath)); } } @@ -396,7 +396,7 @@ public class FileSelection { return statuses; } - public boolean supportDirPrunig() { + public boolean supportsDirPruning() { if (isExpandedFully() || isExpandedPartial()) { if (!wasAllPartitionsPruned) { return true; @@ -440,26 +440,15 @@ public class FileSelection { this.emptyDirectory = true; } - @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("root=").append(this.selectionRoot); - + sb.append("root=").append(selectionRoot); sb.append("files=["); - boolean isFirst = true; - for (Path file : this.files) { - if (isFirst) { - isFirst = false; - sb.append(file); - } else { - sb.append(","); - sb.append(file); - } - } + sb.append(getFiles().stream() + .map(Path::toString) + .collect(Collectors.joining(", "))); sb.append("]"); - return sb.toString(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index cf1333d..8a97701 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,9 +34,9 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; -import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; @@ -43,11 +44,14 @@ import org.apache.drill.exec.store.ClassPathFileSystem; import org.apache.drill.exec.store.LocalSyncableFileSystem; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; - +import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem, @@ -57,7 +61,14 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Buil */ public class FileSystemPlugin extends AbstractStoragePlugin { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class); + private static final Logger logger = LoggerFactory.getLogger(FileSystemPlugin.class); + + /** + * org.apache.hadoop.io.compress library supports such codecs as Gzip and Bzip2 out of box. + * This list stores only codecs that are missing in Hadoop library. + */ + private static final List<String> ADDITIONAL_CODECS = Collections.singletonList( + ZipCodec.class.getCanonicalName()); private final FileSystemSchemaFactory schemaFactory; private final FormatCreator formatCreator; @@ -80,6 +91,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin { fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName()); fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName()); + addCodecs(fsConf); + if (isS3Connection(fsConf)) { handleS3Credentials(fsConf); } @@ -111,6 +124,24 @@ public class FileSystemPlugin extends AbstractStoragePlugin { } } + /** + * Merges codecs from configuration with the {@link #ADDITIONAL_CODECS} + * and updates configuration property. + * Drill built-in codecs are added at the beginning of the codecs string + * so config codecs can override Drill ones. + * + * @param conf Hadoop configuration + */ + private void addCodecs(Configuration conf) { + String confCodecs = conf.get(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY); + String builtInCodecs = String.join(",", ADDITIONAL_CODECS); + String newCodecs = Strings.isNullOrEmpty(confCodecs) + ? builtInCodecs + : builtInCodecs + ", " + confCodecs; + logger.trace("Codecs: {}", newCodecs); + conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, newCodecs); + } + private boolean isS3Connection(Configuration conf) { URI uri = FileSystem.getDefaultUri(conf); return uri.getScheme().equals("s3a"); @@ -131,7 +162,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin { for (String key : credentialKeys) { char[] credentialChars = conf.getPassword(key); if (credentialChars == null) { - logger.warn(String.format("Property '%s' is absent.", key)); + logger.warn("Property '{}' is absent.", key); } else { conf.set(key, String.valueOf(credentialChars)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java index 7d7bcfa..d2a5545 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java @@ -17,18 +17,15 @@ */ package org.apache.drill.exec.store.dfs; -import java.util.List; - -import org.apache.drill.common.logical.FormatPluginConfig; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.hadoop.fs.Path; +import java.util.List; public class FormatSelection { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatSelection.class); private FormatPluginConfig format; private FileSelection selection; @@ -62,7 +59,7 @@ public class FormatSelection { } @JsonIgnore - public boolean supportDirPruning() { - return selection.supportDirPrunig(); + public boolean supportsDirPruning() { + return selection.supportsDirPruning(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java new file mode 100644 index 0000000..c613076 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs; + +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.DefaultCodec; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +/** + * ZIP codec implementation which cna read or create single entry. + * <p/> + * Note: Do not rename this class. Class naming must be 'ZipCodec' so it can be mapped by + * {@link org.apache.hadoop.io.compress.CompressionCodecFactory} to the 'zip' extension. + */ +public class ZipCodec extends DefaultCodec { + + private static final String EXTENSION = ".zip"; + + @Override + public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { + return new ZipCompressionOutputStream(new ResetableZipOutputStream(out)); + } + + @Override + public CompressionInputStream createInputStream(InputStream in) throws IOException { + return new ZipCompressionInputStream(new ZipInputStream(in)); + } + + @Override + public String getDefaultExtension() { + return EXTENSION; + } + + /** + * Reads only first entry from {@link ZipInputStream}, + * other entries if present will be ignored. + */ + private static class ZipCompressionInputStream extends CompressionInputStream { + + ZipCompressionInputStream(ZipInputStream in) throws IOException { + super(in); + // positions stream at the beginning of the first entry data + in.getNextEntry(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public void resetState() throws IOException { + in.reset(); + } + + @Override + public void close() throws IOException { + try { + ((ZipInputStream) in).closeEntry(); + } finally { + super.close(); + } + } + } + + /** + * Extends {@link ZipOutputStream} to allow resetting compressor stream, + * required by {@link CompressionOutputStream} implementation. + */ + private static class ResetableZipOutputStream extends ZipOutputStream { + + ResetableZipOutputStream(OutputStream out) { + super(out); + } + + void resetState() { + def.reset(); + } + } + + /** + * Writes given data into ZIP archive by placing all data in one entry with default naming. + */ + private static class ZipCompressionOutputStream extends CompressionOutputStream { + + private static final String DEFAULT_ENTRY_NAME = "entry.out"; + + ZipCompressionOutputStream(ResetableZipOutputStream out) throws IOException { + super(out); + ZipEntry zipEntry = new ZipEntry(DEFAULT_ENTRY_NAME); + out.putNextEntry(zipEntry); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void finish() throws IOException { + ((ResetableZipOutputStream) out).closeEntry(); + } + + @Override + public void resetState() { + ((ResetableZipOutputStream) out).resetState(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java index 6d8b533..0ed71db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java @@ -178,7 +178,7 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> { private void openFile(FileSchemaNegotiator negotiator) { InputStream in; try { - in = negotiator.fileSystem().open(split.getPath()); + in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath()); } catch (Exception e) { throw UserException .dataReadError(e) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 985fb20..9129fc8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,13 +61,11 @@ import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MagicString; import org.apache.drill.exec.store.dfs.MetadataContext; -import org.apache.drill.exec.store.mock.MockStorageEngine; import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.store.parquet.metadata.ParquetTableMetadataDirs; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -73,19 +73,21 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParquetFormatPlugin implements FormatPlugin { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class); + private static final Logger logger = LoggerFactory.getLogger(ParquetFormatPlugin.class); public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); private static final String DEFAULT_NAME = "parquet"; - private static final List<Pattern> PATTERNS = Lists.newArrayList( + private static final List<Pattern> PATTERNS = Arrays.asList( Pattern.compile(".*\\.parquet$"), Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE)); - private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC)); + private static final List<MagicString> MAGIC_STRINGS = Collections.singletonList(new MagicString(0, ParquetFileWriter.MAGIC)); private final DrillbitContext context; private final Configuration fsConf; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java index bc86916..c9caa24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java @@ -28,15 +28,13 @@ import org.apache.drill.exec.store.pcap.decoder.Packet; import org.apache.drill.exec.store.pcap.decoder.PacketDecoder; import org.apache.drill.exec.store.pcap.schema.Schema; import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.mapred.FileSplit; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; - -import org.apache.hadoop.fs.Path; +import java.io.InputStream; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; @@ -48,15 +46,9 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { private FileSplit split; - private PcapReaderConfig readerConfig; - private PacketDecoder decoder; - private ResultSetLoader loader; - - private FSDataInputStream fsStream; - - private Schema pcapSchema; + private InputStream fsStream; private RowSetLoader rowWriter; @@ -135,7 +127,6 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { } public PcapBatchReader(PcapReaderConfig readerConfig) { - this.readerConfig = readerConfig; } @Override @@ -143,10 +134,10 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { split = negotiator.split(); openFile(negotiator); SchemaBuilder builder = new SchemaBuilder(); - pcapSchema = new Schema(); + Schema pcapSchema = new Schema(); TupleMetadata schema = pcapSchema.buildSchema(builder); negotiator.setTableSchema(schema, false); - loader = negotiator.build(); + ResultSetLoader loader = negotiator.build(); // Creates writers for all fields (Since schema is known) rowWriter = loader.writer(); @@ -208,14 +199,13 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { .build(logger); } fsStream = null; - this.buffer = null; - this.decoder = null; + buffer = null; + decoder = null; } private void openFile(FileSchemaNegotiator negotiator) { try { - String filePath = split.getPath().toString(); - fsStream = negotiator.fileSystem().open(new Path(filePath)); + fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath()); decoder = new PacketDecoder(fsStream); buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()]; validBytes = fsStream.read(buffer); @@ -237,9 +227,6 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { getNextPacket(rowWriter); } - if (packet == null) { - return false; - } int old = offset; offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes); if (offset > validBytes) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java index 1678196..b8e0175 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanB import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.dfs.easy.EasySubScan; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.server.DrillbitContext; @@ -34,9 +33,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.drill.exec.store.pcap.PcapBatchReader.PcapReaderConfig; public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> { + public static final String PLUGIN_NAME = "pcap"; private static class PcapReaderFactory extends FileReaderFactory { + private final PcapReaderConfig readerConfig; public PcapReaderFactory(PcapReaderConfig config) { @@ -48,6 +49,7 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> { return new PcapBatchReader(readerConfig); } } + public PcapFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig, PcapFormatConfig formatConfig) { @@ -58,7 +60,7 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> { EasyFormatConfig config = new EasyFormatConfig(); config.readable = true; config.writable = false; - config.blockSplittable = true; + config.blockSplittable = false; config.compressible = true; config.supportsProjectPushdown = true; config.extensions = pluginConfig.getExtensions(); @@ -70,13 +72,12 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> { } @Override - public ManagedReader<? extends FileSchemaNegotiator> newBatchReader( - EasySubScan scan, OptionManager options) throws ExecutionSetupException { + public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) { return new PcapBatchReader(new PcapReaderConfig(this)); } @Override - protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException { + protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) { FileScanBuilder builder = new FileScanBuilder(); builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this))); initScanBuilder(builder, scan); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java index de9a558..41be760 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java @@ -47,7 +47,7 @@ public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> { public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) { super(name, context, fsConf, config, formatPluginConfig, true, - false, true, false, + false, false, true, formatPluginConfig.getExtensions(), DEFAULT_NAME); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java index 0ad234d..152e2e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java @@ -31,18 +31,18 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.pcapng.schema.Column; import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl; import org.apache.drill.exec.store.pcapng.schema.DummyImpl; import org.apache.drill.exec.store.pcapng.schema.Schema; import org.apache.drill.exec.vector.ValueVector; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -59,14 +59,14 @@ public class PcapngRecordReader extends AbstractRecordReader { private final Path pathToFile; private OutputMutator output; private List<ProjectedColumnInfo> projectedCols; - private FileSystem fs; - private FSDataInputStream in; + private DrillFileSystem fs; + private InputStream in; private List<SchemaPath> columns; private Iterator<IPcapngType> it; public PcapngRecordReader(final Path pathToFile, - final FileSystem fileSystem, + final DrillFileSystem fileSystem, final List<SchemaPath> columns) { this.fs = fileSystem; this.pathToFile = fs.makeQualified(pathToFile); @@ -79,7 +79,7 @@ public class PcapngRecordReader extends AbstractRecordReader { try { this.output = output; - this.in = fs.open(pathToFile); + this.in = fs.openPossiblyCompressedStream(pathToFile); PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in)); decoder.decode(); this.it = decoder.getSectionList().iterator(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java index dafeaa3..493a3b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java @@ -16,7 +16,7 @@ * limitations under the License. */ /** - * For comments on realization of this format plugin look at : + * For comments on implementation of this format plugin see: * * @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a> */ diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java new file mode 100644 index 0000000..fd1bf9e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs; + +import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.exec.ExecTest; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.junit.Assert.assertNotNull; + +@Category(UnlikelyTest.class) +public class TestCompressedFiles extends ClusterTest { + + private static FileSystem fs; + private static CompressionCodecFactory factory; + + @BeforeClass + public static void setup() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); + startCluster(builder); + + fs = ExecTest.getLocalFileSystem(); + Configuration conf = fs.getConf(); + conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, ZipCodec.class.getCanonicalName()); + factory = new CompressionCodecFactory(conf); + } + + @Test + public void testGzip() throws Exception { + String fileName = "gz_data.csvh.gz"; + writeData(fileName, "gzip", "id,name\n1,Fred\n2,Wilma"); + + testBuilder() + .sqlQuery("select * from dfs.`root`.`%s`", fileName) + .unOrdered() + .baselineColumns("id", "name") + .baselineValues("1", "Fred") + .baselineValues("2", "Wilma") + .go(); + } + + @Test + public void testBzip2() throws Exception { + String fileName = "bzip2_data.csvh.bz2"; + writeData(fileName, "bzip2", "id,name\n3,Bamm-Bamm\n4,Barney"); + + testBuilder() + .sqlQuery("select * from dfs.`root`.`%s`", fileName) + .unOrdered() + .baselineColumns("id", "name") + .baselineValues("3", "Bamm-Bamm") + .baselineValues("4", "Barney") + .go(); + } + + @Test + public void testZip() throws Exception { + String fileName = "zip_data.csvh.zip"; + writeData(fileName, "zip", "id,name\n5,Dino\n6,Pebbles"); + + testBuilder() + .sqlQuery("select * from dfs.`root`.`%s`", fileName) + .unOrdered() + .baselineColumns("id", "name") + .baselineValues("5", "Dino") + .baselineValues("6", "Pebbles") + .go(); + } + + private void writeData(String fileName, String codecName, String data) throws IOException { + CompressionCodec codec = factory.getCodecByName(codecName); + assertNotNull(codecName + " is not found", codec); + Path outFile = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), fileName); + try (InputStream inputStream = new ByteArrayInputStream(data.getBytes()); + OutputStream outputStream = codec.createOutputStream(fs.create(outFile))) { + IOUtils.copyBytes(inputStream, outputStream, fs.getConf(), false); + } + } +}