exceptionfactory commented on code in PR #7194:
URL: https://github.com/apache/nifi/pull/7194#discussion_r1212152365


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelHeaderSchemaStrategy.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.excel;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.FieldTypeInference;
+import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.SchemaInferenceUtil;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.DataFormatter;
+import org.apache.poi.ss.usermodel.Row;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy {
+    private static final Set<SchemaField> schemaFields = 
EnumSet.noneOf(SchemaField.class);
+
+    private final PropertyContext context;
+
+    private final DataFormatter dataFormatter;
+
+    private final ComponentLog logger;
+
+    public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog 
logger) {
+        this(context, logger, null);
+    }
+
+    public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog 
logger, Locale locale) {
+        this.context = context;
+        this.logger = logger;
+        this.dataFormatter = locale == null ? new DataFormatter() : new 
DataFormatter(locale);
+    }
+
+    @Override
+    public RecordSchema getSchema(Map<String, String> variables, InputStream 
contentStream, RecordSchema readSchema) throws SchemaNotFoundException, 
IOException {
+        if (this.context == null) {
+            throw new SchemaNotFoundException("Schema Access Strategy intended 
only for validation purposes and cannot obtain schema");
+        }
+
+        String errMsg = "Failed to read Header line from Excel worksheet";
+        RecordSource<Row> recordSource;
+        try {
+            recordSource = new ExcelRecordSource(contentStream, context, 
variables, logger);
+        } catch (Exception e) {
+            throw new SchemaNotFoundException(errMsg, e);
+        }
+
+        Row headerRow = recordSource.next();
+        if (!ExcelUtils.hasCells(headerRow)) {
+            throw new SchemaNotFoundException("The chosen header line in the 
Excel worksheet had no cells");
+        }
+
+        try {
+            String dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+            String timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+            String timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+            final TimeValueInference timeValueInference = new 
TimeValueInference(dateFormat, timeFormat, timestampFormat);
+            final Map<String, FieldTypeInference> typeMap = new 
LinkedHashMap<>();
+            IntStream.range(0, headerRow.getLastCellNum())
+                    .forEach(index -> {
+                        final Cell cell = headerRow.getCell(index);
+                        final String fieldName = Integer.toString(index);

Review Comment:
   This approach uses the field index as the field name, instead of using the 
cell value as the field name, which would be the expected behavior.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.excel;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
+import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.stream.io.NonCloseableInputStream;
+import org.apache.poi.ss.usermodel.Row;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.stream.IntStream;
+
+@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", 
"values", "cell"})
+@CapabilityDescription("Parses a Microsoft Excel document returning each row 
in each sheet as a separate record. "
+        + "This reader allows for inferring a schema either based on the first 
line of an Excel sheet if a 'header line' is "
+        + "present or from all the required sheets, or providing an explicit 
schema "
+        + "for interpreting the values. See Controller Service's Usage for 
further documentation. "
+        + "This reader is currently only capable of processing .xlsx "
+        + "(XSSF 2007 OOXML file format) Excel documents and not older .xls 
(HSSF '97(-2007) file format) documents.")
+public class ExcelReader extends SchemaRegistryService implements 
RecordReaderFactory {
+
+    private static final AllowableValue HEADER_DERIVED = new 
AllowableValue("excel-header-derived", "Use Fields From Header",
+            "The first chosen row of the Excel sheet is a header row that 
contains the columns representative of all the rows " +
+                    "in the required sheets. The schema will be derived by 
using those columns in the header.");
+    public static final PropertyDescriptor REQUIRED_SHEETS = new 
PropertyDescriptor
+            .Builder().name("Required Sheets")
+            .displayName("Required Sheets")
+            .description("Comma separated list of Excel document sheet names 
whose rows should be extracted from the excel document. If this property" +
+                    " is left blank then all the rows from all the sheets will 
be extracted from the Excel document. The list of names is case sensitive. Any 
sheets not" +
+                    " specified in this value will be ignored. An exception 
will be thrown if a specified sheet(s) are not found.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor STARTING_ROW = new 
PropertyDescriptor
+            .Builder().name("Starting Row")
+            .displayName("Starting Row")
+            .description("The row number of the first row to start processing 
(One based)."
+                    + " Use this to skip over rows of data at the top of a 
worksheet that are not part of the dataset.")
+            .required(true)
+            .defaultValue("1")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private AtomicReferenceArray<String> requiredSheets;
+    private volatile int firstRow;
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.firstRow = getStartingRow(context);
+        String[] rawRequiredSheets = getRawRequiredSheets(context);
+        this.requiredSheets = new 
AtomicReferenceArray<>(rawRequiredSheets.length);
+        IntStream.range(0, rawRequiredSheets.length)
+                .forEach(index -> this.requiredSheets.set(index, 
rawRequiredSheets[index]));
+        this.dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+        this.timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+        this.timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+    }
+
+    @Override
+    public RecordReader createRecordReader(Map<String, String> variables, 
InputStream in, long inputLength, ComponentLog logger) throws 
MalformedRecordException, IOException, SchemaNotFoundException {
+        // Use Mark/Reset of a BufferedInputStream in case we read from the 
Input Stream for the header.
+        in.mark(1024 * 1024);
+        final RecordSchema schema = getSchema(variables, new 
NonCloseableInputStream(in), null);
+        in.reset();
+
+        ExcelRecordReaderConfiguration configuration = new 
ExcelRecordReaderConfiguration.Builder()
+                .withDateFormat(dateFormat)
+                .withRequiredSheets(requiredSheets)
+                .withFirstRow(firstRow)
+                .withSchema(schema)
+                .withTimeFormat(timeFormat)
+                .withTimestampFormat(timestampFormat)
+                .build();
+
+        return new ExcelRecordReader(configuration, in, logger);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(REQUIRED_SHEETS);
+        properties.add(STARTING_ROW);

Review Comment:
   Minor adjustment, recommend placing the required Starting Row property 
before the optional Required Sheets property.
   ```suggestion
           properties.add(STARTING_ROW);
           properties.add(REQUIRED_SHEETS);
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/RowIterator.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.excel;
+
+import com.github.pjfanning.xlsx.StreamingReader;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RowIterator implements Iterator<Row>, Closeable {
+    private final Workbook workbook;
+    private final Iterator<Sheet> sheets;
+    private final Map<String, Boolean> requiredSheets;
+    private final int firstRow;
+    private final ComponentLog logger;
+    private Sheet currentSheet;
+    private Iterator<Row> currentRows;
+    private Row currentRow;
+
+    public RowIterator(InputStream in, List<String> requiredSheets, int 
firstRow, ComponentLog logger) {
+        this.workbook = StreamingReader.builder()
+                .rowCacheSize(100)
+                .bufferSize(4096)
+                .open(in);
+        this.sheets = this.workbook.iterator();
+        this.requiredSheets = requiredSheets != null ? requiredSheets.stream()
+                .collect(Collectors.toMap(key -> key, value -> Boolean.FALSE)) 
: new HashMap<>();
+        this.firstRow = firstRow;
+        this.logger = logger;
+    }
+
+    @Override
+    public boolean hasNext() {
+        setCurrent();
+        boolean next = currentRow != null;
+        if(!next) {
+            String requiredSheetsNotFoundMessage = 
getRequiredSheetsNotFoundMessage();
+            if (!requiredSheetsNotFoundMessage.isEmpty()) {
+                throw new ProcessException("The following required Excel 
sheet(s) were not found " + requiredSheetsNotFoundMessage);

Review Comment:
   Recommend a minor wording adjustment:
   ```suggestion
                   throw new ProcessException("Required Excel Sheets not found 
" + requiredSheetsNotFoundMessage);
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordReaderConfiguration.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.excel;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+public class ExcelRecordReaderConfiguration {
+    private RecordSchema schema;
+    private AtomicReferenceArray<String> requiredSheets;

Review Comment:
   Is there a reason for using AtomicReferenceArray as opposed to a simple 
List? This property should not be modifiable and it is defined.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/excel/TestExcelHeaderSchemaStrategy.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.excel;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(MockitoExtension.class)
+public class TestExcelHeaderSchemaStrategy {
+    @Mock
+    ComponentLog logger;
+
+    @ParameterizedTest
+    @MethodSource("getLocales")
+    public void testInferenceAgainstDifferentLocales(Locale locale) throws 
IOException, SchemaNotFoundException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> 
properties.put(prop, prop.getDefaultValue()));
+        final PropertyContext context = new 
MockConfigurationContext(properties, null);
+        ExcelHeaderSchemaStrategy headerSchemaStrategy = new 
ExcelHeaderSchemaStrategy(context, logger, locale);
+
+        RecordSchema schema = headerSchemaStrategy.getSchema(null, 
getInputStream("numbers.xlsx"), null);
+
+        final List<String> fieldNames = schema.getFieldNames();
+        assertEquals(Collections.singletonList("0"), fieldNames);
+        if (Locale.FRENCH.equals(locale)) {
+            assertEquals(RecordFieldType.STRING, 
schema.getDataType("0").get().getFieldType());
+        } else {
+            assertEquals(RecordFieldType.FLOAT, 
schema.getDataType("0").get().getFieldType());
+        }
+    }
+
+    private static Stream<Arguments> getLocales() {
+        Locale hindi = new Locale("hin");
+        return Stream.of(
+                Arguments.of(Locale.ENGLISH),
+                Arguments.of(hindi),
+                Arguments.of(Locale.JAPANESE),
+                Arguments.of(Locale.FRENCH)
+        );
+    }
+
+    private InputStream getInputStream(String excelFile) throws IOException {
+        String excelResourcesDir = "src/test/resources/excel";
+        Path excelDoc = Paths.get(excelResourcesDir, excelFile);
+        return Files.newInputStream(excelDoc);
+    }
+
+    @Test
+    public void testColumnHeaders() throws IOException, 
SchemaNotFoundException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> 
properties.put(prop, prop.getDefaultValue()));
+        final PropertyContext context = new 
MockConfigurationContext(properties, null);
+        ExcelHeaderSchemaStrategy headerSchemaStrategy = new 
ExcelHeaderSchemaStrategy(context, logger);
+
+        RecordSchema schema = headerSchemaStrategy.getSchema(null, 
getInputStream("simpleDataFormatting.xlsx"), null);
+
+        final List<String> fieldNames = schema.getFieldNames();
+        assertEquals(Arrays.asList("0", "1", "2", "3"), fieldNames);
+        assertEquals(RecordFieldType.STRING.getDataType(), 
schema.getDataType("0").get());
+        assertEquals(RecordFieldType.STRING.getDataType(), 
schema.getDataType("1").get());
+        assertEquals(RecordFieldType.STRING, 
schema.getDataType("2").get().getFieldType());
+        assertEquals(RecordFieldType.STRING.getDataType(), 
schema.getDataType("3").get());
+    }
+
+    @Test
+    public void testAfterColumnHeaders() throws IOException, 
SchemaNotFoundException{
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> 
properties.put(prop, prop.getDefaultValue()));
+        final PropertyContext context = new 
MockConfigurationContext(properties, null);
+        properties.put(ExcelReader.STARTING_ROW, "2");
+        ExcelHeaderSchemaStrategy headerSchemaStrategy = new 
ExcelHeaderSchemaStrategy(context, logger);
+
+        RecordSchema schema = headerSchemaStrategy.getSchema(null, 
getInputStream("simpleDataFormatting.xlsx"), null);
+
+        final List<String> fieldNames = schema.getFieldNames();
+        assertEquals(Arrays.asList("0", "1", "2", "3"), fieldNames);

Review Comment:
   Instead of numbers, these fields should reflect the cell values from the 
first row.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelHeaderSchemaStrategy.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.excel;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.FieldTypeInference;
+import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.SchemaInferenceUtil;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.DataFormatter;
+import org.apache.poi.ss.usermodel.Row;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy {
+    private static final Set<SchemaField> schemaFields = 
EnumSet.noneOf(SchemaField.class);
+
+    private final PropertyContext context;
+
+    private final DataFormatter dataFormatter;
+
+    private final ComponentLog logger;
+
+    public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog 
logger) {
+        this(context, logger, null);
+    }
+
+    public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog 
logger, Locale locale) {
+        this.context = context;
+        this.logger = logger;
+        this.dataFormatter = locale == null ? new DataFormatter() : new 
DataFormatter(locale);
+    }
+
+    @Override
+    public RecordSchema getSchema(Map<String, String> variables, InputStream 
contentStream, RecordSchema readSchema) throws SchemaNotFoundException, 
IOException {
+        if (this.context == null) {
+            throw new SchemaNotFoundException("Schema Access Strategy intended 
only for validation purposes and cannot obtain schema");
+        }
+
+        String errMsg = "Failed to read Header line from Excel worksheet";
+        RecordSource<Row> recordSource;
+        try {
+            recordSource = new ExcelRecordSource(contentStream, context, 
variables, logger);
+        } catch (Exception e) {
+            throw new SchemaNotFoundException(errMsg, e);
+        }
+
+        Row headerRow = recordSource.next();
+        if (!ExcelUtils.hasCells(headerRow)) {
+            throw new SchemaNotFoundException("The chosen header line in the 
Excel worksheet had no cells");
+        }
+
+        try {
+            String dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+            String timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+            String timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+            final TimeValueInference timeValueInference = new 
TimeValueInference(dateFormat, timeFormat, timestampFormat);
+            final Map<String, FieldTypeInference> typeMap = new 
LinkedHashMap<>();
+            IntStream.range(0, headerRow.getLastCellNum())
+                    .forEach(index -> {
+                        final Cell cell = headerRow.getCell(index);
+                        final String fieldName = Integer.toString(index);
+                        final FieldTypeInference typeInference = 
typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
+                        final String formattedCellValue = 
dataFormatter.formatCellValue(cell);
+                        final DataType dataType = 
SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference);
+                        typeInference.addPossibleDataType(dataType);
+                    });
+
+            final List<RecordField> fields = typeMap.entrySet().stream()
+                    .map(entry -> new RecordField(entry.getKey(), 
entry.getValue().toDataType(), true))
+                    .collect(Collectors.toList());
+
+            return new SimpleRecordSchema(fields);
+        } catch (Exception e) {
+            throw new SchemaNotFoundException(errMsg, e);

Review Comment:
   This message should be different from above. The reader return a row, but 
processing failed.
   ```suggestion
               throw new SchemaNotFoundException("Failed to create Schema from 
Header Row", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to