paul-rogers commented on a change in pull request #1749: DRILL-7177: Format 
Plugin for Excel Files
URL: https://github.com/apache/drill/pull/1749#discussion_r336789362
 
 

 ##########
 File path: 
contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
 ##########
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.excel;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.CellValue;
+import org.apache.poi.ss.usermodel.DateUtil;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.xssf.usermodel.XSSFSheet;
+import org.apache.poi.xssf.usermodel.XSSFWorkbook;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.joda.time.Instant;
+import java.util.Iterator;
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private ExcelReaderConfig readerConfig;
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExcelBatchReader.class);
+
+  private static final String SAFE_WILDCARD = "_$";
+
+  private static final String SAFE_SEPARATOR = "_";
+
+  private static final String PARSER_WILDCARD = ".*";
+
+  private static final String HEADER_NEW_LINE_REPLACEMENT = "__";
+
+  private static final String MISSING_FIELD_NAME_HEADER = "field_";
+
+  private XSSFSheet sheet;
+
+  private XSSFWorkbook workbook;
+
+  private FSDataInputStream fsStream;
+
+  private FormulaEvaluator evaluator;
+
+  private ArrayList<String> excelFieldNames;
+
+  private ArrayList<ScalarWriter> columnWriters;
+
+  private Iterator<Row> rowIterator;
+
+  private RowSetLoader rowWriter;
+
+  private int totalColumnCount;
+
+  private int lineCount;
+
+  private boolean firstLine;
+
+  private FileSplit split;
+
+  private ResultSetLoader loader;
+
+  private int recordCount;
+
+  public static class ExcelReaderConfig {
+    protected final ExcelFormatPlugin plugin;
+
+    protected final int headerRow;
+
+    protected final int lastRow;
+
+    protected final int firstColumn;
+
+    protected final int lastColumn;
+
+    protected final boolean readAllFieldsAsVarChar;
+
+    protected String sheetName;
+
+    public ExcelReaderConfig(ExcelFormatPlugin plugin) {
+      this.plugin = plugin;
+      headerRow = plugin.getConfig().getHeaderRow();
+      lastRow = plugin.getConfig().getLastRow();
+      firstColumn = plugin.getConfig().getFirstColumn();
+      lastColumn = plugin.getConfig().getLastColumn();
+      readAllFieldsAsVarChar = plugin.getConfig().getReadAllFieldsAsVarChar();
+      sheetName = plugin.getConfig().getSheetName();
+    }
+  }
+
+  public ExcelBatchReader(ExcelReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    firstLine = true;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    verifyConfigOptions();
+    split = negotiator.split();
+    loader = negotiator.build();
+    rowWriter = loader.writer();
+    openFile(negotiator);
+    defineSchema();
+    return true;
+  }
+
+  private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+    try {
+      fsStream = negotiator.fileSystem().open(split.getPath());
+      workbook = new XSSFWorkbook(fsStream.getWrappedStream());
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", 
split.getPath().toString())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    // Evaluate formulae
+    evaluator = workbook.getCreationHelper().createFormulaEvaluator();
+
+    workbook.setMissingCellPolicy(Row.MissingCellPolicy.CREATE_NULL_AS_BLANK);
+    sheet = getSheet();
+  }
+
+  /**
+   * This function defines the schema from the header row.
+   * @return TupleMedata of the discovered schema
+   */
+  private TupleMetadata defineSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    return getColumnHeaders(builder);
+  }
+
+  protected TupleMetadata getColumnHeaders(SchemaBuilder builder) {
+    //Get the field names
+    int columnCount = 0;
+    if (readerConfig.headerRow >= 0) {
+      columnCount = 
sheet.getRow(readerConfig.headerRow).getPhysicalNumberOfCells();
+    } else {
+      columnCount = sheet.getRow(0).getPhysicalNumberOfCells();
+    }
+    excelFieldNames = new ArrayList<>(columnCount);
+    rowIterator = sheet.iterator();
+
+    //If there are no headers, create columns names of field_n
+    if (readerConfig.headerRow == -1) {
+      String missingFieldName;
+      for (int i = 0; i < columnCount; i++) {
+        missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1);
+        makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR);
+        excelFieldNames.add(i, missingFieldName);
+      }
+      columnWriters = new ArrayList<ScalarWriter>(excelFieldNames.size());
+      return builder.buildSchema();
+    } else if (rowIterator.hasNext()) {
+      //Find the header row
+      while (this.lineCount < readerConfig.headerRow) {
+        Row row = rowIterator.next();
+        this.lineCount++;
+      }
+      //Get the header row and column count
+      Row row = rowIterator.next();
+      this.totalColumnCount = row.getLastCellNum();
+
+      //Read the header row
+      Iterator<Cell> cellIterator = row.cellIterator();
+      int colPosition = 0;
+      String tempColumnName = "";
+
+      while (cellIterator.hasNext()) {
+        Cell cell = cellIterator.next();
+
+        CellValue cellValue = evaluator.evaluate(cell);
+        switch (cellValue.getCellTypeEnum()) {
+          case STRING:
+            tempColumnName = cell.getStringCellValue()
+              .replace(PARSER_WILDCARD, SAFE_WILDCARD)
+              .replaceAll("\\.", SAFE_SEPARATOR)
+              .replaceAll("\\n", HEADER_NEW_LINE_REPLACEMENT);
+            makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR);
+            excelFieldNames.add(colPosition, tempColumnName);
+            break;
+          case NUMERIC:
+            tempColumnName = String.valueOf(cell.getNumericCellValue());
+            makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8);
+            excelFieldNames.add(colPosition, tempColumnName);
+            break;
+        }
+        colPosition++;
+      }
+    }
+    columnWriters = new ArrayList<ScalarWriter>(excelFieldNames.size());
+    return builder.buildSchema();
+  }
+
+  /**
+   * Helper function to get the selected sheet from the configuration
+   *
+   * @return XSSFSheet The selected sheet
+   */
+  private XSSFSheet getSheet() {
+    int sheetIndex = 0;
+    if (!readerConfig.sheetName.isEmpty()) {
+      sheetIndex = workbook.getSheetIndex(readerConfig.sheetName);
+    }
+
+    //If the sheet name is not valid, throw user exception
+    if (sheetIndex == -1) {
+      throw UserException
+        .validationError()
+        .message("Could not open sheet " + readerConfig.sheetName)
+        .build(logger);
+    } else {
+      return workbook.getSheetAt(sheetIndex);
+    }
+  }
+
+  @Override
+  public boolean next() {
+    recordCount = 0;
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean nextLine(RowSetLoader rowWriter) {
+    if (!rowIterator.hasNext()) {
+      return false;
+    } else if (recordCount >= readerConfig.lastRow) {
+      return false;
+    }
+
+    int lastRow = readerConfig.lastRow;
+    while (recordCount < lastRow && rowIterator.hasNext()) {
+
+      lineCount++;
+
+      Row row = rowIterator.next();
+      // If the user specified that there are no headers, get the column count
+      if (readerConfig.headerRow == -1 && recordCount == 0) {
+        this.totalColumnCount = row.getLastCellNum();
+      }
+
+      String fieldName;
+      if (row.getLastCellNum() < totalColumnCount) {
+        throw UserException.dataReadError().message("Wrong number of columns 
in row: %d", row.getLastCellNum()).build(logger);
+      }
+      int colPosition = 0;
+      if (readerConfig.firstColumn != 0) {
+        colPosition = readerConfig.firstColumn - 1;
+      }
+
+      int finalColumn = totalColumnCount;
+      if (readerConfig.lastColumn != 0) {
+        finalColumn = readerConfig.lastColumn - 1;
+      }
+      rowWriter.start();
+      for (int colWriterIndex = 0; colPosition < finalColumn; colPosition++) {
+        Cell cell = row.getCell(colPosition);
+
+        /*if (readerConfig.firstColumn != 0) {
+          colWriterIndex += readerConfig.firstColumn - 1;
+        }*/
+
+        CellValue cellValue = evaluator.evaluate(cell);
+        if (cellValue == null) {
+          String fieldValue = "";
+          if (firstLine) {
+            addColumnToArray(rowWriter, excelFieldNames.get(colPosition), 
MinorType.VARCHAR);
+          }
+          columnWriters.get(colWriterIndex).setString(fieldValue);
+
+        } else {
+          switch (cellValue.getCellTypeEnum()) {
+            case NUMERIC:
+              if (DateUtil.isCellDateFormatted(cell)) {
+                Instant timeStamp = new 
Instant(cell.getDateCellValue().getTime());
+                if (firstLine) {
+                  addColumnToArray(rowWriter, 
excelFieldNames.get(colPosition), MinorType.TIMESTAMP);
+                }
+                columnWriters.get(colWriterIndex).setTimestamp(timeStamp);
+              } else {
+                double fieldNumValue = cell.getNumericCellValue();
+                if (firstLine) {
+                  addColumnToArray(rowWriter, 
excelFieldNames.get(colPosition), MinorType.FLOAT8);
+                }
+                columnWriters.get(colWriterIndex).setDouble(fieldNumValue);
+              }
+              break;
+            case STRING:
+              String fieldValue = "";
+              fieldValue = cellValue.formatAsString();
+              if (fieldValue.length() > 1) {
+                fieldValue = fieldValue.substring(1);
+                fieldValue = fieldValue.substring(0, fieldValue.length() - 1);
+              }
+              if (firstLine) {
+                addColumnToArray(rowWriter, excelFieldNames.get(colPosition), 
TypeProtos.MinorType.VARCHAR);
+              }
+              columnWriters.get(colWriterIndex).setString(fieldValue);
+              break;
+            default:
+              // Case for unknown data type
+              // Throw exception
+          }
+        }
+
+        colWriterIndex++;
+      }
+
+      if (firstLine) {
+        firstLine = false;
+      }
+      rowWriter.save();
+      recordCount++;
+    }
+    return true;
+  }
+
+  private void makeColumn(SchemaBuilder builder, String name, 
TypeProtos.MinorType type) {
+    // Verify supported types
+    switch (type) {
+      // The Excel Reader only Supports Strings, Floats and Date/Times
+      case VARCHAR:
+      case FLOAT8:
+      case DATE:
+      case TIMESTAMP:
+      case TIME:
+        builder.addNullable(name, type);
+        break;
+      default:
+        throw UserException
+          .validationError()
+          .message("Undefined column types")
+          .addContext("Field name", name)
+          .addContext("Type", type.toString())
+          .build(logger);
+    }
+  }
+
+  private void addColumnToArray(TupleWriter rowWriter, String name, 
TypeProtos.MinorType type) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, 
TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    } else {
+      return;
+    }
+    columnWriters.add(rowWriter.scalar(index));
+  }
+
+  private void verifyConfigOptions() {
+    // Validate the config variables
+    if ((readerConfig.lastColumn < readerConfig.firstColumn) && 
readerConfig.lastColumn != 0) {
+      throw UserException
+        .validationError()
+        .message("Invalid column configuration.  The first column index is 
greater than the last column index.")
+        .build(logger);
+    }
+
+    if (readerConfig.firstColumn < 0) {
+      throw UserException
+        .validationError()
+        .message("Invalid value for first column.  Index must be greater than 
zero.")
+        .build(logger);
+    }
+
+    if (readerConfig.lastColumn < 0) {
+      throw UserException
+        .validationError()
+        .message("Invalid value for last column.  Index must be greater than 
zero.")
+        .build(logger);
+    }
+
+    if (readerConfig.headerRow > readerConfig.lastRow) {
+      throw UserException
+        .validationError()
+        .message("Invalid value for headerRow.  Header row must be less than 
last row.")
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (workbook != null) {
+      try {
+        workbook.close();
+      } catch (IOException e) {
+        logger.warn("Error when closing XSSFWorkbook resource: {}, 
e.getMessage())");
+        throw UserException
 
 Review comment:
   See comment below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to