[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-06 Thread cgivre
GitHub user cgivre opened a pull request:

https://github.com/apache/drill/pull/1114

Drill-6104: Added Logfile Reader

I would like to submit a format plugin that will enable Drill to read log 
files.   Here is a link to the github repo which contains documentation: 
https://github.com/cgivre/drill-logfile-plugin



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cgivre/drill format-logfile

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1114.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1114


commit 20b9f185ae0f3d5e600813668345430c50984b0c
Author: cgivre 
Date:   2018-02-06T13:34:45Z

Added Logfile Reader




---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131092
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167130905
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131126
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167129510
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167130781
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167130581
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131250
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
--- End diff --

This claims that this plugin handles (projection) push-down, but no such 
code exists in the implementation.


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131709
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131014
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167128873
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
+return null;
+  }
+
+  @JsonTypeName("log")
+  public static class LogFormatConfig implements FormatPluginConfig {
+public List extensions;
+public List fieldNames;
+public List dataTypes;
+public String dateFormat = "";
+public String timeFormat = "HH:mm:ss";
+public String pattern;
+public Boolean errorOnMismatch = false;
+
+private static final List DEFAULT_EXTS = 
ImmutableList.of("log");
+
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public List getExtensions() {
+  if (extensions == null) {
+return DEFAULT_EXTS;
+  }
+  return extensions;
+}
+
+public List getFieldNames() {
+  return fieldNames;
+}
+
+public List getDataTypes() {
+  return dataTypes;
+}
+
+public String getDateFormat() {
+  

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167129411
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167129981
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167128074
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
--- End diff --

Generally, the package statement goes below the copyright notice.


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167128268
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
+return null;
+  }
+
+  @JsonTypeName("log")
+  public static class LogFormatConfig implements FormatPluginConfig {
+public List extensions;
+public List fieldNames;
+public List dataTypes;
+public String dateFormat = "";
+public String timeFormat = "HH:mm:ss";
+public String pattern;
+public Boolean errorOnMismatch = false;
+
+private static final List DEFAULT_EXTS = 
ImmutableList.of("log");
--- End diff --

To ensure this plugin is usable by default, maybe define a default layout 
that matches the entire line as a single column?


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167128552
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
+return null;
+  }
+
+  @JsonTypeName("log")
+  public static class LogFormatConfig implements FormatPluginConfig {
+public List extensions;
+public List fieldNames;
+public List dataTypes;
+public String dateFormat = "";
+public String timeFormat = "HH:mm:ss";
+public String pattern;
+public Boolean errorOnMismatch = false;
+
+private static final List DEFAULT_EXTS = 
ImmutableList.of("log");
+
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public List getExtensions() {
+  if (extensions == null) {
+return DEFAULT_EXTS;
+  }
+  return extensions;
+}
+
+public List getFieldNames() {
+  return fieldNames;
+}
+
+public List getDataTypes() {
+  return dataTypes;
+}
+
+public String getDateFormat() {
+  

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167132178
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167129205
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
--- End diff --

Probably don't want to do this here. The `RecordReader` protocol is kind of 
awkward. Suppose you scan 1000 log files in a single thread. The Scan operator 
will create 1000 `RecordReader` instances, then process them one by one. We 
really don't want to open 1000 files at the same time. So, in the constructor, 
just get ready, but defer file opening until the `setup()` call.


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167130507
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167129450
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131367
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131439
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167132395
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131536
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167130388
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167132068
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r167131794
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeForm

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-17 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r168939597
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeFormat = 

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170674675
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeFormat = 

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170674795
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeFormat = 

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170675202
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
--- End diff --

Fixed


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170675226
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
+return null;
+  }
+
+  @JsonTypeName("log")
+  public static class LogFormatConfig implements FormatPluginConfig {
+public List extensions;
+public List fieldNames;
+public List dataTypes;
+public String dateFormat = "";
+public String timeFormat = "HH:mm:ss";
+public String pattern;
+public Boolean errorOnMismatch = false;
+
+private static final List DEFAULT_EXTS = 
ImmutableList.of("log");
--- End diff --

I added a default regex of `(.*)` which should capture the entire line as 
well as a field name of `full_line` by default. 


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170675294
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
+return null;
+  }
+
+  @JsonTypeName("log")
+  public static class LogFormatConfig implements FormatPluginConfig {
+public List extensions;
+public List fieldNames;
+public List dataTypes;
+public String dateFormat = "";
+public String timeFormat = "HH:mm:ss";
+public String pattern;
+public Boolean errorOnMismatch = false;
+
+private static final List DEFAULT_EXTS = 
ImmutableList.of("log");
+
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public List getExtensions() {
+  if (extensions == null) {
+return DEFAULT_EXTS;
+  }
+  return extensions;
+}
+
+public List getFieldNames() {
+  return fieldNames;
+}
+
+public List getDataTypes() {
+  return dataTypes;
+}
+
+public String getDateFormat() {
+  r

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170675324
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
--- End diff --

Fixed


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170675258
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
+return null;
+  }
+
+  @JsonTypeName("log")
+  public static class LogFormatConfig implements FormatPluginConfig {
+public List extensions;
+public List fieldNames;
+public List dataTypes;
+public String dateFormat = "";
+public String timeFormat = "HH:mm:ss";
+public String pattern;
+public Boolean errorOnMismatch = false;
+
+private static final List DEFAULT_EXTS = 
ImmutableList.of("log");
+
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public List getExtensions() {
+  if (extensions == null) {
+return DEFAULT_EXTS;
+  }
+  return extensions;
+}
+
+public List getFieldNames() {
+  return fieldNames;
+}
+
+public List getDataTypes() {
+  return dataTypes;
+}
+
+public String getDateFormat() {
+  r

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170675353
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 ---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+  private String inputPath;
+  private BufferedReader reader;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+  private LogFormatPlugin.LogFormatConfig config;
+  private int lineCount;
+  private Pattern r;
+
+  private List fieldNames;
+  private List dataTypes;
+  private boolean errorOnMismatch;
+  private String dateFormat;
+  private String timeFormat;
+  private java.text.DateFormat df;
+  private java.text.DateFormat tf;
+  private long time;
+
+  public LogRecordReader(FragmentContext fragmentContext, String 
inputPath, DrillFileSystem fileSystem,
+ List columns, 
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+try {
+  Path hdfsPath = new Path(inputPath);
+  Configuration conf = new Configuration();
+  FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+  CompressionCodec codec = factory.getCodec(hdfsPath);
+  if (codec == null) {
+reader = new BufferedReader(new 
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+  } else {
+CompressionInputStream comInputStream = 
codec.createInputStream(fsStream.getWrappedStream());
+reader = new BufferedReader(new InputStreamReader(comInputStream));
+  }
+  this.inputPath = inputPath;
+  this.lineCount = 0;
+  this.config = config;
+  this.buffer = fragmentContext.getManagedBuffer(4096);
+  setColumns(columns);
+
+} catch (IOException e) {
+  logger.debug("Log Reader Plugin: " + e.getMessage());
+}
+  }
+
+  public void setup(final OperatorContext context, final OutputMutator 
output) throws ExecutionSetupException {
+this.writer = new VectorContainerWriter(output);
+String regex = config.getPattern();
+
+
+fieldNames = config.getFieldNames();
+dataTypes = config.getDataTypes();
+dateFormat = config.getDateFormat();
+timeFormat = 

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-02-26 Thread cgivre
Github user cgivre commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r170690424
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class LogFormatPlugin extends 
EasyFormatPlugin {
+
+  private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "log";
+  private LogFormatConfig config;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig storageConfig) {
+this(name, context, fsConf, storageConfig, new LogFormatConfig());
+  }
+
+  public LogFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, LogFormatConfig 
formatPluginConfig) {
+super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+this.config = formatPluginConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, 
DrillFileSystem dfs, FileWork fileWork,
+  List columns, String 
userName) throws ExecutionSetupException {
+return new LogRecordReader(context, fileWork.getPath(), dfs, columns, 
config);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+return true;
--- End diff --

@paul-rogers Are there any examples of how to implement this?  I would like 
this to be as fast as possible.


---


[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

2018-03-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1114#discussion_r175244984
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 ---
@@ -0,0 +1,151 @@
+package org.apache.drill.exec.store.log;
--- End diff --

The comment says fixed. Did you forget to commit the changes to your 
branch? Github still shows the original code...


---