jnturton commented on code in PR #2725:
URL: https://github.com/apache/drill/pull/2725#discussion_r1053465233


##########
contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java:
##########
@@ -37,34 +42,77 @@ public static void setup() throws Exception {
 
   @Test
   public void testWildcard() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT * FROM cp.`simple.ltsv`")
-      .unOrdered()
-      .baselineColumns("host", "forwardedfor", "req", "status", "size", 
"referer", "ua", "reqtime", "apptime", "vhost")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/xxx HTTP/1.1", "200", 
"4968", "-", "Java/1.8.0_131", "2.532", "2.532", "api.example.com")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", 
"412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
-      .go();
+    String sql = "SELECT * FROM cp.`simple.ltsv`";

Review Comment:
   Let's rename this class TestLTSVQueries or similar now that LTSVRecordReader 
is gone?



##########
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import com.github.lolo.ltsv.LtsvParser;
+import com.github.lolo.ltsv.LtsvParser.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+public class LTSVBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVBatchReader.class);
+  private final LTSVFormatPluginConfig config;
+  private final FileDescrip file;
+  private final CustomErrorContext errorContext;
+  private final LtsvParser ltsvParser;
+  private final RowSetLoader rowWriter;
+  private final FileSchemaNegotiator negotiator;
+  private InputStream fsStream;
+  private Iterator<Map<String, String>> rowIterator;
+
+
+  public LTSVBatchReader(LTSVFormatPluginConfig config, FileSchemaNegotiator 
negotiator) {
+    this.config = config;
+    this.negotiator = negotiator;
+    file = negotiator.file();
+    errorContext = negotiator.parentErrorContext();
+    ltsvParser = buildParser();
+
+    openFile();
+
+    // If there is a provided schema, import it
+    if (negotiator.providedSchema() != null) {
+      TupleMetadata schema = negotiator.providedSchema();
+      negotiator.tableSchema(schema, false);
+    }
+    ResultSetLoader loader = negotiator.build();
+    rowWriter = loader.writer();
+
+  }
+
+  private void openFile() {
+    try {
+      fsStream = 
file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Unable to open LTSV File %s", file.split().getPath() + " " 
+ e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+    }
+    rowIterator = ltsvParser.parse(fsStream);
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processNextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private LtsvParser buildParser() {
+    Builder builder = LtsvParser.builder();
+    builder.trimKeys();
+    builder.trimValues();
+    builder.skipNullValues();
+
+    if (config.getParseMode().contentEquals("strict")) {
+      builder.strict();
+    } else {
+      builder.lenient();
+    }
+
+    if (StringUtils.isNotEmpty(config.getEscapeCharacter())) {
+      builder.withEscapeChar(config.getEscapeCharacter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getKvDelimiter())) {
+      builder.withKvDelimiter(config.getKvDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getEntryDelimiter())) {
+      builder.withEntryDelimiter(config.getEntryDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getLineEnding())) {
+      builder.withLineEnding(config.getLineEnding().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getQuoteChar())) {
+      builder.withQuoteChar(config.getQuoteChar().charAt(0));
+    }
+
+    return builder.build();
+  }
+
+  private boolean processNextRow() {
+    if (!rowIterator.hasNext()) {
+      return false;
+    }
+    // Start the row
+    String key;
+    String value;
+    int columnIndex;
+    ScalarWriter columnWriter;
+    Map<String, String> row = rowIterator.next();
+
+    // Skip empty lines
+    if (row.isEmpty()) {
+      return true;
+    }
+    rowWriter.start();
+    for (Map.Entry<String,String> field: row.entrySet()) {
+      key = field.getKey();
+      value = field.getValue();
+      columnIndex = getColumnIndex(key);
+      columnWriter = getColumnWriter(key);
+
+
+      if (negotiator.providedSchema() != null) {
+        // Check the type. LTSV will only read other data types if a schema is 
provided.
+        ColumnMetadata columnMetadata = 
rowWriter.tupleSchema().metadata(columnIndex);
+        MinorType dataType = columnMetadata.type();
+        LocalTime localTime;
+        LocalDate localDate;
+
+        switch (dataType) {
+          case BIT:
+            columnWriter.setBoolean(Boolean.parseBoolean(value));
+            break;
+          case INT:
+          case SMALLINT:
+          case TINYINT:
+            columnWriter.setInt(Integer.parseInt(value));
+            break;
+          case BIGINT:
+            columnWriter.setLong(Long.parseLong(value));
+            break;
+          case FLOAT8:
+          case FLOAT4:
+            columnWriter.setDouble(Double.parseDouble(value));
+            break;
+          case TIME:
+            columnMetadata = rowWriter.tupleSchema().metadata(key);
+            String dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localTime = LocalTime.parse(value);
+            } else {
+              localTime = LocalTime.parse(value, 
DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setTime(localTime);
+            break;
+          case DATE:
+            dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localDate = LocalDate.parse(value);
+            } else {
+              localDate = LocalDate.parse(value, 
DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setDate(localDate);
+            break;
+          case TIMESTAMP:
+            dateFormat = columnMetadata.property("drill.format");
+            Instant timestamp;
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              timestamp = Instant.parse(value);
+            } else {
+              try {
+                SimpleDateFormat simpleDateFormat = new 
SimpleDateFormat(dateFormat);
+                Date parsedDate = simpleDateFormat.parse(value);
+                timestamp = Instant.ofEpochMilli(parsedDate.getTime());
+              } catch (ParseException e) {
+                throw UserException.parseError(e)
+                    .message("Cannot parse " + value + " as a timestamp. You 
can specify a format string in the provided schema to correct this.")
+                    .addContext(errorContext)
+                    .build(logger);
+              }
+            }
+            columnWriter.setTimestamp(timestamp);
+            break;
+          default:
+            columnWriter.setString(value);
+        }
+      } else {
+        columnWriter.setString(value);
+      }
+    }
+    // Finish the row
+    rowWriter.save();
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(fsStream);

Review Comment:
   Can I propose that we add a log output line at the debug level because we're 
freeing an operating resource here? We don't always log that, but I think we 
should because it can help someone pouring through logs while chasing a 
resource consumption problem.



##########
contrib/format-ltsv/pom.xml:
##########
@@ -36,6 +36,11 @@
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.github.lonely-lockley</groupId>
+      <artifactId>ltsv-parser</artifactId>

Review Comment:
   Reviewer's note: this library is under the Apache 2.0 license



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to