This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 01e72d6b51485f0ac6304448e4dee14b717e0671 Author: Bence Simon <bsi...@apache.org> AuthorDate: Wed May 3 14:12:59 2023 +0200 NIFI-11523 Refining schema handling for ScriptedTransfromRecord This closes #7226 Signed-off-by: Mike Thomsen <mthom...@apache.org> --- .../processors/script/ScriptedTransformRecord.java | 5 ++-- .../script/TestScriptedTransformRecord.java | 30 +++++++++++++++------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java index 2a9637ad63..b67431cd6e 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java @@ -137,6 +137,7 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final Map<String, String> originalAttributes = flowFile.getAttributes(); final RecordCounts counts = new RecordCounts(); try { @@ -165,10 +166,8 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor { record.incorporateInactiveFields(); if (writer == null) { - final RecordSchema writerSchema; - writerSchema = record.getSchema(); - try { + final RecordSchema writerSchema = writerFactory.getSchema(originalAttributes, record.getSchema()); writer = writerFactory.createWriter(getLogger(), writerSchema, out, flowFile); } catch (SchemaNotFoundException e) { throw new IOException(e); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestScriptedTransformRecord.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestScriptedTransformRecord.java index 6f6e522cd9..82e9751f2a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestScriptedTransformRecord.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestScriptedTransformRecord.java @@ -33,6 +33,7 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -96,16 +97,17 @@ public class TestScriptedTransformRecord { } @Test - public void testAddFieldToSchema() throws InitializationException { - final RecordSchema schema = createSimpleNumberSchema(); - setup(schema); + public void testAddFieldToSchemaWhenWriterSchemaIsDefined() throws InitializationException { + final RecordSchema readSchema = createSimpleNumberSchema(); + final RecordSchema writeSchema = createSchemaWithAddedValue(); + setupWithDifferentSchemas(readSchema, writeSchema); testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY); testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/groovy/AddNewField.groovy"); - recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 1)))); - recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 2)))); - recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 3)))); + recordReader.addRecord(new MapRecord(readSchema, new HashMap<>(Collections.singletonMap("num", 1)))); + recordReader.addRecord(new MapRecord(readSchema, new HashMap<>(Collections.singletonMap("num", 2)))); + recordReader.addRecord(new MapRecord(readSchema, new HashMap<>(Collections.singletonMap("num", 3)))); testRunner.enqueue(new byte[0]); @@ -127,7 +129,6 @@ public class TestScriptedTransformRecord { written.forEach(record -> assertEquals(88, record.getAsInt("added-value").intValue())); } - @Test public void testZeroRecordInput() throws InitializationException { final RecordSchema schema = createSimpleNumberSchema(); @@ -484,12 +485,16 @@ public class TestScriptedTransformRecord { private void setup(final RecordSchema schema) throws InitializationException { + setupWithDifferentSchemas(schema, schema); + } + + private void setupWithDifferentSchemas(final RecordSchema readerSchema, final RecordSchema writerSchema) throws InitializationException { testRunner = TestRunners.newTestRunner(ScriptedTransformRecord.class); testRunner.setProperty(ScriptedTransformRecord.RECORD_READER, "record-reader"); testRunner.setProperty(ScriptedTransformRecord.RECORD_WRITER, "record-writer"); - recordReader = new ArrayListRecordReader(schema); - recordWriter = new ArrayListRecordWriter(schema); + recordReader = new ArrayListRecordReader(readerSchema); + recordWriter = new ArrayListRecordWriter(writerSchema); testRunner.addControllerService("record-reader", recordReader); testRunner.addControllerService("record-writer", recordWriter); @@ -507,4 +512,11 @@ public class TestScriptedTransformRecord { return schema; } + private RecordSchema createSchemaWithAddedValue() { + final RecordField recordFieldNum = new RecordField("num", RecordFieldType.INT.getDataType()); + final RecordField recordFieldAdd = new RecordField("added-value", RecordFieldType.INT.getDataType()); + final List<RecordField> recordFields = Arrays.asList(recordFieldNum, recordFieldAdd); + final RecordSchema schema = new SimpleRecordSchema(recordFields); + return schema; + } }