Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1114#discussion_r167131126 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java --- @@ -0,0 +1,261 @@ +package org.apache.drill.exec.store.log; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class LogRecordReader extends AbstractRecordReader { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class); + private static final int MAX_RECORDS_PER_BATCH = 8096; + + private String inputPath; + private BufferedReader reader; + private DrillBuf buffer; + private VectorContainerWriter writer; + private LogFormatPlugin.LogFormatConfig config; + private int lineCount; + private Pattern r; + + private List<String> fieldNames; + private List<String> dataTypes; + private boolean errorOnMismatch; + private String dateFormat; + private String timeFormat; + private java.text.DateFormat df; + private java.text.DateFormat tf; + private long time; + + public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem, + List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException { + try { + Path hdfsPath = new Path(inputPath); + Configuration conf = new Configuration(); + FSDataInputStream fsStream = fileSystem.open(hdfsPath); + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + CompressionCodec codec = factory.getCodec(hdfsPath); + if (codec == null) { + reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8")); + } else { + CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream()); + reader = new BufferedReader(new InputStreamReader(comInputStream)); + } + this.inputPath = inputPath; + this.lineCount = 0; + this.config = config; + this.buffer = fragmentContext.getManagedBuffer(4096); + setColumns(columns); + + } catch (IOException e) { + logger.debug("Log Reader Plugin: " + e.getMessage()); + } + } + + public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + this.writer = new VectorContainerWriter(output); + String regex = config.getPattern(); + + + fieldNames = config.getFieldNames(); + dataTypes = config.getDataTypes(); + dateFormat = config.getDateFormat(); + timeFormat = config.getTimeFormat(); + errorOnMismatch = config.getErrorOnMismatch(); + + /* + This section will check for; + 1. Empty regex + 2. Invalid Regex + 3. Empty date string if the date format is used + 4. No capturing groups in the regex + 5. Incorrect number of data types + 6. Invalid data types + */ + if (regex.isEmpty()) { + throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger); + } else { + //TODO Check for invalid regex + r = Pattern.compile(regex); + Matcher m = r.matcher("test"); + if (m.groupCount() == 0) { + throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger); + } else if (m.groupCount() != (fieldNames.size())) { + throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups. There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger); + + } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) { + //If the number of data types is not correct, create a list of varchar + dataTypes = new ArrayList<String>(); + for (int i = -0; i < m.groupCount(); i++) { + dataTypes.add("VARCHAR"); + } + } + } + + //Check and set up date formats + if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) { + if (dateFormat != null && !dateFormat.isEmpty()) { + df = new java.text.SimpleDateFormat(dateFormat); + } else { + throw UserException.parseError().message("Invalid date format. The date formatting string was empty. Please specify a valid date format string in the configuration for this data source.", 0).build(logger); + } + } + + if (dataTypes.contains("TIME")) { --- End diff -- As above.
---