This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new fa1ed16 NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set fa1ed16 is described below commit fa1ed16e2bfc389466941e3bfe36a7a6ff08403b Author: avseq1234 <sun.s...@gmail.com> AuthorDate: Sun Jul 7 21:26:16 2019 +0800 NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set replace getAttribute(uuid) with getAttribute(CoreAttributes.UUID.key() fix checkstyle violation This closes #3575. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../nifi/processors/standard/AbstractExecuteSQL.java | 12 +++++++++++- .../apache/nifi/processors/standard/TestExecuteSQL.java | 14 ++++++++++++-- .../nifi/processors/standard/TestExecuteSQLRecord.java | 9 ++++++++- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index 212febc..700e92e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -62,6 +62,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; public static final String RESULTSET_INDEX = "executesql.resultset.index"; public static final String RESULT_ERROR_MESSAGE = "executesql.error.message"; + public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid"; public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); @@ -247,6 +248,8 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { boolean hasUpdateCount = st.getUpdateCount() != -1; + Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes(); + String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key()); while (hasResults || hasUpdateCount) { //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet if (hasResults) { @@ -262,9 +265,13 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { resultSetFF = session.create(); } else { resultSetFF = session.create(fileToProcess); - resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); } + if (inputFileAttrMap != null) { + resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap); + } + + try { resultSetFF = session.write(resultSetFF, out -> { try { @@ -283,6 +290,9 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount)); + if (inputFileUUID != null) { + attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID); + } attributesToAdd.putAll(sqlWriter.getAttributesToAdd()); resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd); sqlWriter.updateCounters(session); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 881483e..efecbf1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -44,6 +44,7 @@ import org.apache.avro.io.DatumReader; import org.apache.commons.compress.compressors.CompressorException; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; @@ -303,10 +304,15 @@ public class TestExecuteSQL { stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); } + + Map<String, String> attrMap = new HashMap<>(); + String testAttrName = "attr1"; + String testAttrValue = "value1"; + attrMap.put(testAttrName, testAttrValue); runner.setIncomingConnection(true); runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5"); runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1"); - runner.enqueue("SELECT * FROM TEST_NULL_INT"); + MockFlowFile inputFlowFile = runner.enqueue("SELECT * FROM TEST_NULL_INT", attrMap); runner.run(); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200); @@ -322,9 +328,13 @@ public class TestExecuteSQL { MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199); + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); - lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue); + lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key())); + + } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index 64f6aed..723f141 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -240,6 +240,11 @@ public class TestExecuteSQLRecord { stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); } + Map<String, String> attrMap = new HashMap<>(); + String testAttrName = "attr1"; + String testAttrValue = "value1"; + attrMap.put(testAttrName, testAttrValue); + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); runner.addControllerService("writer", recordWriter); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); @@ -248,7 +253,7 @@ public class TestExecuteSQLRecord { runner.setIncomingConnection(true); runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5"); runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1"); - runner.enqueue("SELECT * FROM TEST_NULL_INT"); + MockFlowFile inputFlowFile = runner.enqueue("SELECT * FROM TEST_NULL_INT", attrMap); runner.run(); runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200); @@ -267,6 +272,8 @@ public class TestExecuteSQLRecord { lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5"); lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0"); + lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue); + lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key())); } @Test