This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 6e69b7009c NIFI-13304 Added SplitExcel Processor 6e69b7009c is described below commit 6e69b7009c669b0aaf554b5cb5e17b2fdc5decd6 Author: dan-s1 <dsti...@gmail.com> AuthorDate: Wed Jun 26 17:52:56 2024 +0000 NIFI-13304 Added SplitExcel Processor This closes #9011 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../org/apache/nifi/processors/poi/SplitExcel.java | 284 +++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 3 +- .../apache/nifi/processors/poi/TestSplitExcel.java | 136 ++++++++++ .../src/test/resources/dataWithSharedFormula.xlsx | Bin 0 -> 16865 bytes .../src/test/resources/dates.xlsx | Bin 0 -> 4574 bytes .../src/test/resources/sheetsWithEmptySheet.xlsx | Bin 0 -> 7445 bytes 6 files changed, 422 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/java/org/apache/nifi/processors/poi/SplitExcel.java b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/java/org/apache/nifi/processors/poi/SplitExcel.java new file mode 100644 index 0000000000..c1a75b936f --- /dev/null +++ b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/java/org/apache/nifi/processors/poi/SplitExcel.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.poi; + +import com.github.pjfanning.xlsx.StreamingReader; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.poi.ss.usermodel.CellCopyPolicy; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; +import org.apache.poi.xssf.usermodel.XSSFSheet; +import org.apache.poi.xssf.usermodel.XSSFWorkbook; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; + +@SideEffectFree +@SupportsBatching +@Tags({"split", "text"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Splits a multi sheet Microsoft Excel spreadsheet into multiple Microsoft Excel spreadsheets where each sheet from the original" + + " file is converted to an individual spreadsheet in its own flow file. This processor is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.") +@WritesAttributes({ + @WritesAttribute(attribute = "fragment.identifier", description = "All split Excel FlowFiles produced from the same parent Excel FlowFile will have the same randomly generated UUID added" + + " for this attribute"), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split Excel FlowFiles that were created from a single parent Excel FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of split Excel FlowFiles generated from the parent Excel FlowFile"), + @WritesAttribute(attribute = "segment.original.filename", description = "The filename of the parent Excel FlowFile"), + @WritesAttribute(attribute = SplitExcel.SHEET_NAME, description = "The name of the Excel sheet from the original spreadsheet."), + @WritesAttribute(attribute = SplitExcel.TOTAL_ROWS, description = "The number of rows in the Excel sheet from the original spreadsheet.")}) +public class SplitExcel extends AbstractProcessor { + public enum ProtectionType implements DescribedValue { + UNPROTECTED("Unprotected", "An Excel spreadsheet not protected by a password"), + PASSWORD("Password Protected", "An Excel spreadsheet protected by a password"); + + ProtectionType(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + private final String displayName; + private final String description; + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + } + + public static final String SHEET_NAME = "sheetname"; + public static final String TOTAL_ROWS = "total.rows"; + + public static final PropertyDescriptor PROTECTION_TYPE = new PropertyDescriptor.Builder() + .name("Protection Type") + .description("Specifies whether an Excel spreadsheet is protected by a password or not.") + .required(true) + .allowableValues(ProtectionType.class) + .defaultValue(ProtectionType.UNPROTECTED.getValue()) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password for a password protected Excel spreadsheet") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .dependsOn(PROTECTION_TYPE, ProtectionType.PASSWORD) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship.") + .build(); + + public static final Relationship REL_SPLIT = new Relationship.Builder() + .name("split") + .description("The individual Excel 'segments' of the original Excel FlowFile will be routed to this relationship.") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS; + private static final Set<Relationship> RELATIONSHIPS; + private static final CellCopyPolicy CELL_COPY_POLICY = new CellCopyPolicy.Builder() + .cellFormula(CellCopyPolicy.DEFAULT_COPY_CELL_FORMULA_POLICY) + .cellStyle(CellCopyPolicy.DEFAULT_COPY_CELL_STYLE_POLICY) + .cellValue(CellCopyPolicy.DEFAULT_COPY_CELL_VALUE_POLICY) + .condenseRows(CellCopyPolicy.DEFAULT_CONDENSE_ROWS_POLICY) + .copyHyperlink(CellCopyPolicy.DEFAULT_COPY_HYPERLINK_POLICY) + .mergeHyperlink(CellCopyPolicy.DEFAULT_MERGE_HYPERLINK_POLICY) + .mergedRegions(CellCopyPolicy.DEFAULT_COPY_MERGED_REGIONS_POLICY) + .rowHeight(CellCopyPolicy.DEFAULT_COPY_ROW_HEIGHT_POLICY) + .build(); + + static { + DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(PROTECTION_TYPE, PASSWORD)); + RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_ORIGINAL, REL_FAILURE, REL_SPLIT))); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile originalFlowFile = session.get(); + if (originalFlowFile == null) { + return; + } + + final String password = context.getProperty(PASSWORD).getValue(); + final List<WorkbookSplit> workbookSplits = new ArrayList<>(); + + try { + session.read(originalFlowFile, in -> { + + final Workbook originalWorkbook = StreamingReader.builder() + .rowCacheSize(100) + .bufferSize(4096) + .password(password) + .setReadHyperlinks(true) // NOTE: Needed for copying rows. + .setReadSharedFormulas(true) // NOTE: If not set to true, then data with shared formulas fail. + .open(in); + + int index = 0; + for (final Sheet originalSheet : originalWorkbook) { + final String originalSheetName = originalSheet.getSheetName(); + try (XSSFWorkbook newWorkbook = new XSSFWorkbook()) { + XSSFSheet newSheet = newWorkbook.createSheet(originalSheetName); + List<Row> originalRows = new ArrayList<>(); + for (Row originalRow : originalSheet) { + originalRows.add(originalRow); + } + + if (!originalRows.isEmpty()) { + newSheet.copyRows(originalRows, originalSheet.getFirstRowNum(), CELL_COPY_POLICY); + } + + FlowFile newFlowFile = session.create(originalFlowFile); + try (final OutputStream out = session.write(newFlowFile)) { + newWorkbook.write(out); + workbookSplits.add(new WorkbookSplit(index, newFlowFile, originalSheetName, originalRows.size())); + } + } + + index++; + } + }); + } catch (RuntimeException e) { + getLogger().error("Failed to split {}", originalFlowFile, e); + session.remove(workbookSplits.stream() + .map(WorkbookSplit::content) + .collect(Collectors.toList())); + workbookSplits.clear(); + session.transfer(originalFlowFile, REL_FAILURE); + return; + } + + final String fragmentId = UUID.randomUUID().toString(); + final String originalFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); + final int extensionIndex = originalFileName.lastIndexOf("."); + String originalFileNameWithoutExtension = originalFileName; + String originalFileNameExtension = ""; + + if (extensionIndex > -1) { + originalFileNameWithoutExtension = originalFileName.substring(0, extensionIndex); + originalFileNameExtension = originalFileName.substring(extensionIndex); + } + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(FRAGMENT_COUNT.key(), String.valueOf(workbookSplits.size())); + attributes.put(FRAGMENT_ID.key(), fragmentId); + attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), originalFileName); + + for (WorkbookSplit split : workbookSplits) { + attributes.put(CoreAttributes.FILENAME.key(), String.format("%s-%s%s", originalFileNameWithoutExtension, split.index(), originalFileNameExtension)); + attributes.put(FRAGMENT_INDEX.key(), Integer.toString(split.index())); + attributes.put(SHEET_NAME, split.sheetName()); + attributes.put(TOTAL_ROWS, Integer.toString(split.numRows())); + session.putAllAttributes(split.content(), attributes); + } + + session.transfer(originalFlowFile, REL_ORIGINAL); + final List<FlowFile> flowFileSplits = workbookSplits.stream() + .map(WorkbookSplit::content) + .collect(Collectors.toList()); + + session.transfer(flowFileSplits, REL_SPLIT); + } + + private static class WorkbookSplit { + private final int index; + private final FlowFile content; + private final String sheetName; + private final int numRows; + + public WorkbookSplit(int index, FlowFile content, String sheetName, int numRows) { + this.index = index; + this.content = content; + this.sheetName = sheetName; + this.numRows = numRows; + } + + public int index() { + return index; + } + + public FlowFile content() { + return content; + } + + public String sheetName() { + return sheetName; + } + + public int numRows() { + return numRows; + } + } +} diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 43baa0b62f..17b3ae3c34 100644 --- a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor \ No newline at end of file +org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor +org.apache.nifi.processors.poi.SplitExcel \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/java/org/apache/nifi/processors/poi/TestSplitExcel.java b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/java/org/apache/nifi/processors/poi/TestSplitExcel.java new file mode 100644 index 0000000000..3bf5b8e800 --- /dev/null +++ b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/java/org/apache/nifi/processors/poi/TestSplitExcel.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.poi; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class TestSplitExcel { + private TestRunner runner; + + @BeforeEach + void setUp() { + runner = TestRunners.newTestRunner(SplitExcel.class); + } + + @Test + void testSingleSheet() throws IOException { + Path singleSheet = Paths.get("src/test/resources/dates.xlsx"); + runner.enqueue(singleSheet); + + runner.run(); + + runner.assertTransferCount(SplitExcel.REL_SPLIT, 1); + runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitExcel.REL_FAILURE, 0); + } + + @Test + void testMultisheet() throws IOException { + Path multisheet = Paths.get("src/test/resources/TwoSheets.xlsx"); + String fileName = multisheet.toFile().getName(); + runner.enqueue(multisheet); + + runner.run(); + + runner.assertTransferCount(SplitExcel.REL_SPLIT, 2); + runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitExcel.REL_FAILURE, 0); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitExcel.REL_SPLIT); + String expectedSheetNamesPrefix = "TestSheet"; + List<String> expectedSheetSuffixes = Arrays.asList("A", "B"); + List<Integer> expectedTotalRows = Arrays.asList(4, 3); + + for (int index = 0; index < flowFiles.size(); index++) { + MockFlowFile flowFile = flowFiles.get(index); + assertNotNull(flowFile.getAttribute(FRAGMENT_ID.key())); + assertEquals(Integer.toString(index), flowFile.getAttribute(FRAGMENT_INDEX.key())); + assertEquals(Integer.toString(flowFiles.size()), flowFile.getAttribute(FRAGMENT_COUNT.key())); + assertEquals(fileName, flowFile.getAttribute(SEGMENT_ORIGINAL_FILENAME.key())); + assertEquals(expectedSheetNamesPrefix + expectedSheetSuffixes.get(index), flowFile.getAttribute(SplitExcel.SHEET_NAME)); + assertEquals(expectedTotalRows.get(index).toString(), flowFile.getAttribute(SplitExcel.TOTAL_ROWS)); + } + } + + @Test + void testNonExcel() throws IOException { + Path nonExcel = Paths.get("src/test/resources/Unsupported.xls"); + runner.enqueue(nonExcel); + + runner.run(); + + runner.assertTransferCount(SplitExcel.REL_SPLIT, 0); + runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 0); + runner.assertTransferCount(SplitExcel.REL_FAILURE, 1); + } + + @Test + void testWithEmptySheet() throws IOException { + Path sheetsWithEmptySheet = Paths.get("src/test/resources/sheetsWithEmptySheet.xlsx"); + String fileName = sheetsWithEmptySheet.toFile().getName(); + runner.enqueue(sheetsWithEmptySheet); + + runner.run(); + + runner.assertTransferCount(SplitExcel.REL_SPLIT, 3); + runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitExcel.REL_FAILURE, 0); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitExcel.REL_SPLIT); + List<String> expectedSheetSuffixes = Arrays.asList("TestSheetA", "TestSheetB", "emptySheet"); + List<Integer> expectedTotalRows = Arrays.asList(4, 3, 0); + + for (int index = 0; index < flowFiles.size(); index++) { + MockFlowFile flowFile = flowFiles.get(index); + assertNotNull(flowFile.getAttribute(FRAGMENT_ID.key())); + assertEquals(Integer.toString(index), flowFile.getAttribute(FRAGMENT_INDEX.key())); + assertEquals(Integer.toString(flowFiles.size()), flowFile.getAttribute(FRAGMENT_COUNT.key())); + assertEquals(fileName, flowFile.getAttribute(SEGMENT_ORIGINAL_FILENAME.key())); + assertEquals(expectedSheetSuffixes.get(index), flowFile.getAttribute(SplitExcel.SHEET_NAME)); + assertEquals(expectedTotalRows.get(index).toString(), flowFile.getAttribute(SplitExcel.TOTAL_ROWS)); + } + } + + @Test + void testDataWithSharedFormula() throws IOException { + Path dataWithSharedFormula = Paths.get("src/test/resources/dataWithSharedFormula.xlsx"); + runner.enqueue(dataWithSharedFormula); + + runner.run(); + + runner.assertTransferCount(SplitExcel.REL_SPLIT, 2); + runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitExcel.REL_FAILURE, 0); + } +} diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/dataWithSharedFormula.xlsx b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/dataWithSharedFormula.xlsx new file mode 100644 index 0000000000..c399b640cb Binary files /dev/null and b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/dataWithSharedFormula.xlsx differ diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/dates.xlsx b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/dates.xlsx new file mode 100644 index 0000000000..3b3d145347 Binary files /dev/null and b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/dates.xlsx differ diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/sheetsWithEmptySheet.xlsx b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/sheetsWithEmptySheet.xlsx new file mode 100644 index 0000000000..beafa5f08c Binary files /dev/null and b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/sheetsWithEmptySheet.xlsx differ