Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1114#discussion_r167129411 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java --- @@ -0,0 +1,261 @@ +package org.apache.drill.exec.store.log; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class LogRecordReader extends AbstractRecordReader { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class); + private static final int MAX_RECORDS_PER_BATCH = 8096; + + private String inputPath; + private BufferedReader reader; + private DrillBuf buffer; + private VectorContainerWriter writer; + private LogFormatPlugin.LogFormatConfig config; + private int lineCount; + private Pattern r; + + private List<String> fieldNames; + private List<String> dataTypes; + private boolean errorOnMismatch; + private String dateFormat; + private String timeFormat; + private java.text.DateFormat df; + private java.text.DateFormat tf; + private long time; + + public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem, + List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException { + try { + Path hdfsPath = new Path(inputPath); + Configuration conf = new Configuration(); + FSDataInputStream fsStream = fileSystem.open(hdfsPath); + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + CompressionCodec codec = factory.getCodec(hdfsPath); + if (codec == null) { + reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8")); + } else { + CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream()); + reader = new BufferedReader(new InputStreamReader(comInputStream)); + } + this.inputPath = inputPath; + this.lineCount = 0; + this.config = config; + this.buffer = fragmentContext.getManagedBuffer(4096); + setColumns(columns); + + } catch (IOException e) { + logger.debug("Log Reader Plugin: " + e.getMessage()); + } + } + + public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + this.writer = new VectorContainerWriter(output); + String regex = config.getPattern(); + + + fieldNames = config.getFieldNames(); + dataTypes = config.getDataTypes(); + dateFormat = config.getDateFormat(); + timeFormat = config.getTimeFormat(); + errorOnMismatch = config.getErrorOnMismatch(); + + /* + This section will check for; + 1. Empty regex + 2. Invalid Regex + 3. Empty date string if the date format is used + 4. No capturing groups in the regex + 5. Incorrect number of data types + 6. Invalid data types + */ --- End diff -- Suggestion. The computer will execute this code just as fast if it is broken into methods. But, humans can understand the code easier if this is implemented as: `validateConfig()` which does what the name suggests, by calling... `validateRegex()`, `validateDate()`, and so on.
---