arina-ielchiieva commented on a change in pull request #1749: DRILL-7177: 
Format Plugin for Excel Files
URL: https://github.com/apache/drill/pull/1749#discussion_r339010552
 
 

 ##########
 File path: 
contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
 ##########
 @@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.excel;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.CellType;
+import org.apache.poi.ss.usermodel.CellValue;
+import org.apache.poi.ss.usermodel.DateUtil;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.xssf.usermodel.XSSFSheet;
+import org.apache.poi.xssf.usermodel.XSSFWorkbook;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+
+import java.io.InputStream;
+import java.util.Iterator;
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private ExcelReaderConfig readerConfig;
+
+  private static final Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExcelBatchReader.class);
+
+  private static final String SAFE_WILDCARD = "_$";
+
+  private static final String SAFE_SEPARATOR = "_";
+
+  private static final String PARSER_WILDCARD = ".*";
+
+  private static final String HEADER_NEW_LINE_REPLACEMENT = "__";
+
+  private static final String MISSING_FIELD_NAME_HEADER = "field_";
+
+  private XSSFSheet sheet;
+
+  private XSSFWorkbook workbook;
+
+  private InputStream fsStream;
+
+  private FormulaEvaluator evaluator;
+
+  private ArrayList<String> excelFieldNames;
+
+  private ArrayList<ScalarWriter> columnWriters;
+
+  private ArrayList<CellType> cellTypes;
+
+  private ArrayList<CellWriter> cellWriterArray;
+
+  private Iterator<Row> rowIterator;
+
+  private RowSetLoader rowWriter;
+
+  private int totalColumnCount;
+
+  private int lineCount;
+
+  private boolean firstLine;
+
+  private FileSplit split;
+
+  private ResultSetLoader loader;
+
+  private int recordCount;
+
+  public static class ExcelReaderConfig {
+    final ExcelFormatPlugin plugin;
+
+    final int headerRow;
+
+    final int lastRow;
+
+    final int firstColumn;
+
+    final int lastColumn;
+
+    final boolean readAllFieldsAsVarChar;
+
+    final String sheetName;
+
+    public ExcelReaderConfig(ExcelFormatPlugin plugin) {
+      this.plugin = plugin;
+      headerRow = plugin.getConfig().getHeaderRow();
+      lastRow = plugin.getConfig().getLastRow();
+      firstColumn = plugin.getConfig().getFirstColumn();
+      lastColumn = plugin.getConfig().getLastColumn();
+      readAllFieldsAsVarChar = plugin.getConfig().getReadAllFieldsAsVarChar();
+      sheetName = plugin.getConfig().getSheetName();
+    }
+  }
+
+  public ExcelBatchReader(ExcelReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    firstLine = true;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    loader = negotiator.build();
+    rowWriter = loader.writer();
+    openFile(negotiator);
+    defineSchema();
+    return true;
+  }
+
+  private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      workbook = new XSSFWorkbook(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", 
split.getPath().toString())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    // Evaluate formulae
+    evaluator = workbook.getCreationHelper().createFormulaEvaluator();
+
+    workbook.setMissingCellPolicy(Row.MissingCellPolicy.CREATE_NULL_AS_BLANK);
+    sheet = getSheet();
+  }
+
+  /**
+   * This function defines the schema from the header row.
+   * @return TupleMedata of the discovered schema
+   */
+  private TupleMetadata defineSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    return getColumnHeaders(builder);
+  }
+
+  private TupleMetadata getColumnHeaders(SchemaBuilder builder) {
+    //Get the field names
+    int columnCount = 0;
+
+    // Case for empty sheet.
+    if (sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) {
+      return builder.buildSchema();
+    }
+
+    // Get the number of columns.
+    columnCount = getColumnCount();
+
+    excelFieldNames = new ArrayList<>(columnCount);
+    cellWriterArray = new ArrayList<>(columnCount);
+    rowIterator = sheet.iterator();
+
+    //If there are no headers, create columns names of field_n
+    if (readerConfig.headerRow == -1) {
+      String missingFieldName;
+      for (int i = 0; i < columnCount; i++) {
+        missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1);
+        makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR);
+        excelFieldNames.add(i, missingFieldName);
+      }
+      columnWriters = new ArrayList<>(excelFieldNames.size());
+      cellTypes = new ArrayList<>(excelFieldNames.size());
+
+      return builder.buildSchema();
+    } else if (rowIterator.hasNext()) {
+      //Find the header row
+      int firstHeaderRow = getFirstHeaderRow();
+
+      while (lineCount < firstHeaderRow) {
+        Row row = rowIterator.next();
+        lineCount++;
+      }
+      //Get the header row and column count
+      Row row = rowIterator.next();
+      totalColumnCount = row.getLastCellNum();
+      cellTypes = new ArrayList<>(totalColumnCount);
+
+      //Read the header row
+      Iterator<Cell> cellIterator = row.cellIterator();
+      int colPosition = 0;
+      String tempColumnName = "";
+
+      while (cellIterator.hasNext()) {
+        Cell cell = cellIterator.next();
+
+        CellValue cellValue = evaluator.evaluate(cell);
+        switch (cellValue.getCellTypeEnum()) {
+          case STRING:
+            tempColumnName = cell.getStringCellValue()
+              .replace(PARSER_WILDCARD, SAFE_WILDCARD)
+              .replaceAll("\\.", SAFE_SEPARATOR)
+              .replaceAll("\\n", HEADER_NEW_LINE_REPLACEMENT);
+            makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR);
+            excelFieldNames.add(colPosition, tempColumnName);
+            cellTypes.add(CellType.STRING);
+            break;
+          case NUMERIC:
+            tempColumnName = String.valueOf(cell.getNumericCellValue());
+            makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8);
+            excelFieldNames.add(colPosition, tempColumnName);
+            cellTypes.add(CellType.NUMERIC);
+            break;
+        }
+        colPosition++;
+      }
+    }
+    columnWriters = new ArrayList<>(excelFieldNames.size());
+    return builder.buildSchema();
+  }
+
+  /**
+   * Helper function to get the selected sheet from the configuration
+   *
+   * @return XSSFSheet The selected sheet
+   */
+  private XSSFSheet getSheet() {
+    int sheetIndex = 0;
+    if (!readerConfig.sheetName.isEmpty()) {
+      sheetIndex = workbook.getSheetIndex(readerConfig.sheetName);
+    }
+
+    //If the sheet name is not valid, throw user exception
+    if (sheetIndex == -1) {
+      throw UserException
+        .validationError()
+        .message("Could not open sheet " + readerConfig.sheetName)
+        .build(logger);
+    } else {
+      return workbook.getSheetAt(sheetIndex);
+    }
+  }
+
+  /**
+   * Returns the column count.  There are a few gotchas here in that we have 
to know the header row and count the physical number of cells
+   * in that row.  Since the user can define the header row,
+   * @return
+   */
+  private int getColumnCount() {
+    int columnCount;
+
+    if (readerConfig.headerRow >= 0) {
+      columnCount = 
sheet.getRow(sheet.getFirstRowNum()).getPhysicalNumberOfCells();
+    } else {
+      // Case for when the user defines the headerRow as -1 IE:  When there 
isn't a headerRow.
+      columnCount = sheet.getRow(0).getPhysicalNumberOfCells();
+    }
+    return columnCount;
+  }
+
+  @Override
+  public boolean next() {
+    recordCount = 0;
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean nextLine(RowSetLoader rowWriter) {
+    if( sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) {
+      // Case for empty sheet
+      return false;
+    } else if (!rowIterator.hasNext()) {
+      return false;
+    } else if (recordCount >= readerConfig.lastRow) {
+      return false;
+    }
+
+    int lastRow = readerConfig.lastRow;
+    while (recordCount < lastRow && rowIterator.hasNext()) {
+      lineCount++;
+
+      Row row = rowIterator.next();
+
+      // If the user specified that there are no headers, get the column count
+      if (readerConfig.headerRow == -1 && recordCount == 0) {
+        this.totalColumnCount = row.getLastCellNum();
+      }
+
+      int colPosition = 0;
+      if (readerConfig.firstColumn != 0) {
+        colPosition = readerConfig.firstColumn - 1;
+      }
+
+      int finalColumn = totalColumnCount;
+      if (readerConfig.lastColumn != 0) {
+        finalColumn = readerConfig.lastColumn - 1;
+      }
+      rowWriter.start();
+      for (int colWriterIndex = 0; colPosition < finalColumn; colPosition++) {
+        Cell cell = row.getCell(colPosition);
+
+        CellValue cellValue = evaluator.evaluate(cell);
+
+        populateColumnArray(cell, cellValue, colPosition);
+        cellWriterArray.get(colWriterIndex).load(cell);
+
+        colWriterIndex++;
+      }
+
+      if (firstLine) {
+        firstLine = false;
+      }
+      rowWriter.save();
+      recordCount++;
+    }
+    return true;
+  }
+
+  /**
+   * Function to populate the column array
+   * @param cell
+   * @param cellValue
+   * @param colPosition
+   */
+  private void populateColumnArray(Cell cell, CellValue cellValue, int 
colPosition) {
+    if (!firstLine) {
+      return;
+    }
+
+    if (cellValue == null) {
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), 
MinorType.VARCHAR);
+    } else {
+      CellType cellType = cellValue.getCellTypeEnum();
+      if (cellType == CellType.STRING || readerConfig.readAllFieldsAsVarChar) {
+        addColumnToArray(rowWriter, excelFieldNames.get(colPosition), 
MinorType.VARCHAR);
+      } else if (cellType == CellType.NUMERIC && 
DateUtil.isCellDateFormatted(cell)) {
+        addColumnToArray(rowWriter, excelFieldNames.get(colPosition), 
MinorType.TIMESTAMP);
+      } else if (cellType == CellType.NUMERIC) {
+        addColumnToArray(rowWriter, excelFieldNames.get(colPosition), 
MinorType.FLOAT8);
+      } else {
+        logger.warn("Unknown data type.");
 
 Review comment:
   Include cell type into want message...
   If type is not supported which type it would have?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to