exceptionfactory commented on code in PR #7194: URL: https://github.com/apache/nifi/pull/7194#discussion_r1212152365
########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelHeaderSchemaStrategy.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.FieldTypeInference; +import org.apache.nifi.schema.inference.RecordSource; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.SchemaInferenceUtil; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DataFormatter; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy { + private static final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); + + private final PropertyContext context; + + private final DataFormatter dataFormatter; + + private final ComponentLog logger; + + public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog logger) { + this(context, logger, null); + } + + public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog logger, Locale locale) { + this.context = context; + this.logger = logger; + this.dataFormatter = locale == null ? new DataFormatter() : new DataFormatter(locale); + } + + @Override + public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + if (this.context == null) { + throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); + } + + String errMsg = "Failed to read Header line from Excel worksheet"; + RecordSource<Row> recordSource; + try { + recordSource = new ExcelRecordSource(contentStream, context, variables, logger); + } catch (Exception e) { + throw new SchemaNotFoundException(errMsg, e); + } + + Row headerRow = recordSource.next(); + if (!ExcelUtils.hasCells(headerRow)) { + throw new SchemaNotFoundException("The chosen header line in the Excel worksheet had no cells"); + } + + try { + String dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + String timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + String timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + final TimeValueInference timeValueInference = new TimeValueInference(dateFormat, timeFormat, timestampFormat); + final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>(); + IntStream.range(0, headerRow.getLastCellNum()) + .forEach(index -> { + final Cell cell = headerRow.getCell(index); + final String fieldName = Integer.toString(index); Review Comment: This approach uses the field index as the field name, instead of using the cell value as the field name, which would be the expected behavior. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the required sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use Fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the required sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor REQUIRED_SHEETS = new PropertyDescriptor + .Builder().name("Required Sheets") + .displayName("Required Sheets") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case sensitive. Any sheets not" + + " specified in this value will be ignored. An exception will be thrown if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor STARTING_ROW = new PropertyDescriptor + .Builder().name("Starting Row") + .displayName("Starting Row") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("1") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private AtomicReferenceArray<String> requiredSheets; + private volatile int firstRow; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.firstRow = getStartingRow(context); + String[] rawRequiredSheets = getRawRequiredSheets(context); + this.requiredSheets = new AtomicReferenceArray<>(rawRequiredSheets.length); + IntStream.range(0, rawRequiredSheets.length) + .forEach(index -> this.requiredSheets.set(index, rawRequiredSheets[index])); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header. + in.mark(1024 * 1024); + final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null); + in.reset(); + + ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder() + .withDateFormat(dateFormat) + .withRequiredSheets(requiredSheets) + .withFirstRow(firstRow) + .withSchema(schema) + .withTimeFormat(timeFormat) + .withTimestampFormat(timestampFormat) + .build(); + + return new ExcelRecordReader(configuration, in, logger); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(REQUIRED_SHEETS); + properties.add(STARTING_ROW); Review Comment: Minor adjustment, recommend placing the required Starting Row property before the optional Required Sheets property. ```suggestion properties.add(STARTING_ROW); properties.add(REQUIRED_SHEETS); ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/RowIterator.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import com.github.pjfanning.xlsx.StreamingReader; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RowIterator implements Iterator<Row>, Closeable { + private final Workbook workbook; + private final Iterator<Sheet> sheets; + private final Map<String, Boolean> requiredSheets; + private final int firstRow; + private final ComponentLog logger; + private Sheet currentSheet; + private Iterator<Row> currentRows; + private Row currentRow; + + public RowIterator(InputStream in, List<String> requiredSheets, int firstRow, ComponentLog logger) { + this.workbook = StreamingReader.builder() + .rowCacheSize(100) + .bufferSize(4096) + .open(in); + this.sheets = this.workbook.iterator(); + this.requiredSheets = requiredSheets != null ? requiredSheets.stream() + .collect(Collectors.toMap(key -> key, value -> Boolean.FALSE)) : new HashMap<>(); + this.firstRow = firstRow; + this.logger = logger; + } + + @Override + public boolean hasNext() { + setCurrent(); + boolean next = currentRow != null; + if(!next) { + String requiredSheetsNotFoundMessage = getRequiredSheetsNotFoundMessage(); + if (!requiredSheetsNotFoundMessage.isEmpty()) { + throw new ProcessException("The following required Excel sheet(s) were not found " + requiredSheetsNotFoundMessage); Review Comment: Recommend a minor wording adjustment: ```suggestion throw new ProcessException("Required Excel Sheets not found " + requiredSheetsNotFoundMessage); ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordReaderConfiguration.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class ExcelRecordReaderConfiguration { + private RecordSchema schema; + private AtomicReferenceArray<String> requiredSheets; Review Comment: Is there a reason for using AtomicReferenceArray as opposed to a simple List? This property should not be modifiable and it is defined. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/excel/TestExcelHeaderSchemaStrategy.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockConfigurationContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ExtendWith(MockitoExtension.class) +public class TestExcelHeaderSchemaStrategy { + @Mock + ComponentLog logger; + + @ParameterizedTest + @MethodSource("getLocales") + public void testInferenceAgainstDifferentLocales(Locale locale) throws IOException, SchemaNotFoundException { + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue())); + final PropertyContext context = new MockConfigurationContext(properties, null); + ExcelHeaderSchemaStrategy headerSchemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, locale); + + RecordSchema schema = headerSchemaStrategy.getSchema(null, getInputStream("numbers.xlsx"), null); + + final List<String> fieldNames = schema.getFieldNames(); + assertEquals(Collections.singletonList("0"), fieldNames); + if (Locale.FRENCH.equals(locale)) { + assertEquals(RecordFieldType.STRING, schema.getDataType("0").get().getFieldType()); + } else { + assertEquals(RecordFieldType.FLOAT, schema.getDataType("0").get().getFieldType()); + } + } + + private static Stream<Arguments> getLocales() { + Locale hindi = new Locale("hin"); + return Stream.of( + Arguments.of(Locale.ENGLISH), + Arguments.of(hindi), + Arguments.of(Locale.JAPANESE), + Arguments.of(Locale.FRENCH) + ); + } + + private InputStream getInputStream(String excelFile) throws IOException { + String excelResourcesDir = "src/test/resources/excel"; + Path excelDoc = Paths.get(excelResourcesDir, excelFile); + return Files.newInputStream(excelDoc); + } + + @Test + public void testColumnHeaders() throws IOException, SchemaNotFoundException { + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue())); + final PropertyContext context = new MockConfigurationContext(properties, null); + ExcelHeaderSchemaStrategy headerSchemaStrategy = new ExcelHeaderSchemaStrategy(context, logger); + + RecordSchema schema = headerSchemaStrategy.getSchema(null, getInputStream("simpleDataFormatting.xlsx"), null); + + final List<String> fieldNames = schema.getFieldNames(); + assertEquals(Arrays.asList("0", "1", "2", "3"), fieldNames); + assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("0").get()); + assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("1").get()); + assertEquals(RecordFieldType.STRING, schema.getDataType("2").get().getFieldType()); + assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("3").get()); + } + + @Test + public void testAfterColumnHeaders() throws IOException, SchemaNotFoundException{ + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue())); + final PropertyContext context = new MockConfigurationContext(properties, null); + properties.put(ExcelReader.STARTING_ROW, "2"); + ExcelHeaderSchemaStrategy headerSchemaStrategy = new ExcelHeaderSchemaStrategy(context, logger); + + RecordSchema schema = headerSchemaStrategy.getSchema(null, getInputStream("simpleDataFormatting.xlsx"), null); + + final List<String> fieldNames = schema.getFieldNames(); + assertEquals(Arrays.asList("0", "1", "2", "3"), fieldNames); Review Comment: Instead of numbers, these fields should reflect the cell values from the first row. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelHeaderSchemaStrategy.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.FieldTypeInference; +import org.apache.nifi.schema.inference.RecordSource; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.SchemaInferenceUtil; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DataFormatter; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy { + private static final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); + + private final PropertyContext context; + + private final DataFormatter dataFormatter; + + private final ComponentLog logger; + + public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog logger) { + this(context, logger, null); + } + + public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog logger, Locale locale) { + this.context = context; + this.logger = logger; + this.dataFormatter = locale == null ? new DataFormatter() : new DataFormatter(locale); + } + + @Override + public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + if (this.context == null) { + throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); + } + + String errMsg = "Failed to read Header line from Excel worksheet"; + RecordSource<Row> recordSource; + try { + recordSource = new ExcelRecordSource(contentStream, context, variables, logger); + } catch (Exception e) { + throw new SchemaNotFoundException(errMsg, e); + } + + Row headerRow = recordSource.next(); + if (!ExcelUtils.hasCells(headerRow)) { + throw new SchemaNotFoundException("The chosen header line in the Excel worksheet had no cells"); + } + + try { + String dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + String timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + String timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + final TimeValueInference timeValueInference = new TimeValueInference(dateFormat, timeFormat, timestampFormat); + final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>(); + IntStream.range(0, headerRow.getLastCellNum()) + .forEach(index -> { + final Cell cell = headerRow.getCell(index); + final String fieldName = Integer.toString(index); + final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference()); + final String formattedCellValue = dataFormatter.formatCellValue(cell); + final DataType dataType = SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference); + typeInference.addPossibleDataType(dataType); + }); + + final List<RecordField> fields = typeMap.entrySet().stream() + .map(entry -> new RecordField(entry.getKey(), entry.getValue().toDataType(), true)) + .collect(Collectors.toList()); + + return new SimpleRecordSchema(fields); + } catch (Exception e) { + throw new SchemaNotFoundException(errMsg, e); Review Comment: This message should be different from above. The reader return a row, but processing failed. ```suggestion throw new SchemaNotFoundException("Failed to create Schema from Header Row", e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org