This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 95dc17befb DRILL-8325: Convert PDF Format Plugin to EVF V2 (#2664) 95dc17befb is described below commit 95dc17befb3e86c9c8fbdfb4fe9700a9889addf6 Author: Charles S. Givre <cgi...@apache.org> AuthorDate: Wed Oct 5 21:31:28 2022 -0400 DRILL-8325: Convert PDF Format Plugin to EVF V2 (#2664) --- .../drill/exec/store/pdf/PdfBatchReader.java | 42 +++++++++------------- .../drill/exec/store/pdf/PdfFormatPlugin.java | 33 ++++++----------- .../apache/drill/exec/store/pdf/TestPdfFormat.java | 2 +- 3 files changed, 27 insertions(+), 50 deletions(-) diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java index 48a6bdd549..94b4caf3bd 100644 --- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java +++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java @@ -24,15 +24,15 @@ import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework; -import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.hadoop.mapred.FileSplit; import org.apache.pdfbox.pdmodel.PDDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,17 +52,16 @@ import java.util.Date; import java.util.GregorianCalendar; import java.util.List; -public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> { +public class PdfBatchReader implements ManagedReader { private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class); private static final String NEW_FIELD_PREFIX = "field_"; - private final int maxRecords; private final List<PdfColumnWriter> writers; private final PdfReaderConfig config; private final int startingTableIndex; + private final FileDescrip file; private PdfMetadataReader metadataReader; - private FileSplit split; private CustomErrorContext errorContext; private RowSetLoader rowWriter; private PDDocument document; @@ -73,7 +72,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem private int currentTableIndex; private List<String> firstRow; private PdfRowIterator rowIterator; - private FileScanFramework.FileSchemaNegotiator negotiator; + private final FileSchemaNegotiator negotiator; private int unregisteredColumnCount; // Tables @@ -86,21 +85,16 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem } } - public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) { - this.maxRecords = maxRecords; + public PdfBatchReader(PdfReaderConfig readerConfig, FileSchemaNegotiator negotiator) { this.unregisteredColumnCount = 0; this.writers = new ArrayList<>(); this.config = readerConfig; this.startingTableIndex = readerConfig.plugin.getConfig().defaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().defaultTableIndex(); this.currentTableIndex = this.startingTableIndex; this.columnHeaders = new ArrayList<>(); - } - @Override - public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) { this.negotiator = negotiator; - - split = negotiator.split(); + this.file = negotiator.file(); errorContext = negotiator.parentErrorContext(); builder = new SchemaBuilder(); @@ -132,7 +126,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem // Support provided schema TupleMetadata schema = null; - if (negotiator.hasProvidedSchema()) { + if (negotiator.providedSchema() != null) { schema = negotiator.providedSchema(); negotiator.tableSchema(schema, false); } else { @@ -143,23 +137,19 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem rowWriter = loader.writer(); metadataReader.setRowWriter(rowWriter); // Build the schema - if (negotiator.hasProvidedSchema()) { + if (negotiator.providedSchema() != null) { buildWriterListFromProvidedSchema(schema); } else { buildWriterList(); } metadataReader.addImplicitColumnsToSchema(); - return true; } @Override public boolean next() { while(!rowWriter.isFull()) { - if (rowWriter.limitReached(maxRecords)) { - // Stop reading if the limit has been reached - return false; - } else if (config.plugin.getConfig().combinePages() && + if (config.plugin.getConfig().combinePages() && (!rowIterator.hasNext()) && currentTableIndex < (tables.size() - 1)) { // Case for end of current page but more tables exist and combinePages is set to true. @@ -227,7 +217,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem */ private void openFile() { try { - InputStream fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath()); + InputStream fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(file.split().getPath()); if (Strings.isNullOrEmpty(config.plugin.getConfig().password())) { document = PDDocument.load(fsStream); } else { @@ -239,7 +229,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem } catch (Exception e) { throw UserException .dataReadError(e) - .addContext("Failed to open open input file: %s", split.getPath().toString()) + .addContext("Failed to open open input file: %s", file.split().getPath().toString()) .addContext(errorContext) .build(logger); } @@ -410,7 +400,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem public static class DatePdfColumnWriter extends PdfColumnWriter { private String dateFormat; - DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) { + DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) { super(columnIndex, columnName, rowWriter.scalar(columnName)); ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName); @@ -441,7 +431,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem public static class TimePdfColumnWriter extends PdfColumnWriter { private String dateFormat; - TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) { + TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) { super(columnIndex, columnName, rowWriter.scalar(columnName)); ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName); @@ -476,7 +466,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem super(columnIndex, columnName, rowWriter.scalar(columnName)); } - TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) { + TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) { super(columnIndex, columnName, rowWriter.scalar(columnName)); ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName); diff --git a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java index 380653d66d..bd28978bcf 100644 --- a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java +++ b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java @@ -21,15 +21,13 @@ package org.apache.drill.exec.store.pdf; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; -import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasySubScan; -import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion; import org.apache.hadoop.conf.Configuration; @@ -39,16 +37,14 @@ public class PdfFormatPlugin extends EasyFormatPlugin<PdfFormatConfig> { private static class PdfReaderFactory extends FileReaderFactory { private final PdfBatchReader.PdfReaderConfig readerConfig; - private final int maxRecords; - public PdfReaderFactory(PdfBatchReader.PdfReaderConfig config, int maxRecords) { + public PdfReaderFactory(PdfBatchReader.PdfReaderConfig config) { readerConfig = config; - this.maxRecords = maxRecords; } @Override - public ManagedReader<? extends FileSchemaNegotiator> newReader() { - return new PdfBatchReader(readerConfig, maxRecords); + public ManagedReader newReader(FileSchemaNegotiator negotiator) { + return new PdfBatchReader(readerConfig, negotiator); } } @@ -68,24 +64,15 @@ public class PdfFormatPlugin extends EasyFormatPlugin<PdfFormatConfig> { .extensions(pluginConfig.extensions()) .fsConf(fsConf) .defaultName(DEFAULT_NAME) - .scanVersion(ScanFrameworkVersion.EVF_V1) + .scanVersion(ScanFrameworkVersion.EVF_V2) .supportsLimitPushdown(true) .build(); } @Override - public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) { - return new PdfBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords()); - } - - @Override - protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) { - FileScanBuilder builder = new FileScanBuilder(); + protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) { PdfBatchReader.PdfReaderConfig readerConfig = new PdfBatchReader.PdfReaderConfig(this); - builder.setReaderFactory(new PdfReaderFactory(readerConfig, scan.getMaxRecords())); - - initScanBuilder(builder, scan); builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR)); - return builder; + builder.readerFactory(new PdfReaderFactory(readerConfig)); } } diff --git a/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java b/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java index b304629406..49856051f8 100644 --- a/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java +++ b/contrib/format-pdf/src/test/java/org/apache/drill/exec/store/pdf/TestPdfFormat.java @@ -148,7 +148,7 @@ public class TestPdfFormat extends ClusterTest { @Test public void testNoHeaders() throws RpcException { - String sql = "SELECT * " + + String sql = "SELECT field_0, field_1, field_2, field_3 " + "FROM table(cp.`pdf/argentina_diputados_voting_record.pdf` " + "(type => 'pdf', combinePages => false, extractHeaders => false)) WHERE field_2 = 'Rio Negro'";