arina-ielchiieva commented on a change in pull request #1807: DRILL-7293: 
Convert the regex ("log") plugin to use EVF
URL: https://github.com/apache/drill/pull/1807#discussion_r293327227
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ##########
 @@ -18,86 +18,224 @@
 
 package org.apache.drill.exec.store.log;
 
-import java.io.IOException;
-import org.apache.drill.exec.planner.common.DrillStatsTable;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 
-import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
+  public static final String PLUGIN_NAME = "logRegex";
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  private static class LogReaderFactory extends FileReaderFactory {
+    private final LogFormatPlugin plugin;
+    private final Pattern pattern;
+    private final TupleMetadata schema;
+
+    public LogReaderFactory(LogFormatPlugin plugin, Pattern pattern, 
TupleMetadata schema) {
+      this.plugin = plugin;
+      this.pattern = pattern;
+      this.schema = schema;
+    }
 
-  public static final String DEFAULT_NAME = "logRegex";
-  private final LogFormatConfig formatConfig;
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+       return new LogBatchReader(plugin.getConfig(), pattern, schema);
+    }
+  }
 
   public LogFormatPlugin(String name, DrillbitContext context,
                          Configuration fsConf, StoragePluginConfig 
storageConfig,
                          LogFormatConfig formatConfig) {
-    super(name, context, fsConf, storageConfig, formatConfig,
-        true,  // readable
-        false, // writable
-        true, // blockSplittable
-        true,  // compressible
-        Lists.newArrayList(formatConfig.getExtension()),
-        DEFAULT_NAME);
-    this.formatConfig = formatConfig;
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
   }
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context,
-                                      DrillFileSystem dfs, FileWork fileWork, 
List<SchemaPath> columns,
-                                      String userName) throws 
ExecutionSetupException {
-    return new LogRecordReader(context, dfs, fileWork,
-        columns, userName, formatConfig);
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
LogFormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    // Should be block splitable, but logic not yet implemented.
+    config.blockSplittable = false;
+    config.compressible = true;
+    config.supportsProjectPushdown = true;
+    config.extensions = Lists.newArrayList(pluginConfig.getExtension());
+    config.fsConf = fsConf;
+    config.defaultName = PLUGIN_NAME;
+    config.readerOperatorType = CoreOperatorType.REGEX_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    return config;
   }
 
   @Override
-  public boolean supportsPushDown() {
-    return true;
-  }
+  protected FileScanBuilder frameworkBuilder(
+      OptionManager options, EasySubScan scan) throws ExecutionSetupException {
 
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context,
-                                      EasyWriter writer) throws 
UnsupportedOperationException {
-    throw new UnsupportedOperationException("unimplemented");
-  }
+    // Pattern and schema identical across readers; define
+    // up front.
 
-  @Override
-  public int getReaderOperatorType() {
-    return UserBitShared.CoreOperatorType.REGEX_SUB_SCAN_VALUE;
+    Pattern pattern = setupPattern();
+    Matcher m = pattern.matcher("test");
+    int capturingGroups = m.groupCount();
+    TupleMetadata outputSchema = defineOutputSchema(capturingGroups);
+    TupleMetadata readerSchema = defineReaderSchema(outputSchema);
+
+    // Use the file framework to enable support for implicit and partition
+    // columns.
+
+    FileScanBuilder builder = new FileScanBuilder();
+
+    // Pass along the class that will create a batch reader on demand for
+    // each input file.
+
+    builder.setReaderFactory(new LogReaderFactory(this, pattern, 
readerSchema));
+
+    // The default type of regex columns is nullable VarChar,
+    // so let's use that as the missing column type.
+
+    builder.setNullType(Types.optional(MinorType.VARCHAR));
+
+    // This plugin was created before the concept of "provided schema" was
+    // available. Use the schema obtained from config as the provided schema.
+    // However if a schema is provided, use that instead. No attempt is made
+    // to merge the two schemas: a provided schema simply replaces that defined
+    // in the plugin config. The normal use case would be to define columns in
+    // the plugin config, types in the provided schema.
+
+    TupleMetadata finalSchema = scan.getSchema() == null ? outputSchema : 
scan.getSchema();
+    builder.typeConverterBuilder().providedSchema(finalSchema);
+    return builder;
   }
 
-  @Override
-  public int getWriterOperatorType() {
-    throw new UnsupportedOperationException("unimplemented");
+  /**
+   * Define the output schema: the schema after type conversions.
+   * Does not include the special columns as those are added only when
+   * requested, and are always VARCHAR.
 
 Review comment:
   Add `@param` and `@return` to javadoc

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to