This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new b3880a4 NIFI-5970 Handle multiple input FlowFiles at Put.initConnection b3880a4 is described below commit b3880a4a067915c56aa3d1b602717eab7ffa02fb Author: Koji Kawamura <ijokaruma...@apache.org> AuthorDate: Wed Jul 17 11:28:37 2019 +0900 NIFI-5970 Handle multiple input FlowFiles at Put.initConnection Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3583 --- .../processor/util/pattern/PartialFunctions.java | 2 +- .../apache/nifi/processor/util/pattern/Put.java | 4 +- .../org/apache/nifi/processors/hive/PutHiveQL.java | 4 +- .../apache/nifi/processors/hive/PutHive3QL.java | 2 +- .../apache/nifi/processors/hive/PutHive_1_1QL.java | 4 +- .../processors/standard/PutDatabaseRecord.java | 4 +- .../apache/nifi/processors/standard/PutSQL.java | 31 ++++++++-- .../nifi/processors/standard/TestPutSQL.java | 71 ++++++++++++++++++++++ .../java/org/apache/nifi/dbcp/DBCPService.java | 47 ++++++++++++++ .../apache/nifi/dbcp/DBCPConnectionPoolLookup.java | 20 +++++- .../nifi/dbcp/TestDBCPConnectionPoolLookup.java | 51 ++++++++++++++++ 11 files changed, 223 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java index 7b969b0..9c27200 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java @@ -32,7 +32,7 @@ public class PartialFunctions { @FunctionalInterface public interface InitConnection<FC, C> { - C apply(ProcessContext context, ProcessSession session, FC functionContext, FlowFile flowFile) throws ProcessException; + C apply(ProcessContext context, ProcessSession session, FC functionContext, List<FlowFile> flowFiles) throws ProcessException; } @FunctionalInterface diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java index 80b8088..bcea833 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java @@ -93,8 +93,8 @@ public class Put<FC, C extends AutoCloseable> { return; } - // Only pass in a flow file if there is a single one present - try (C connection = initConnection.apply(context, session, functionContext, flowFiles.size() == 1 ? flowFiles.get(0) : null)) { + // Pass the FlowFiles to initialize a connection + try (C connection = initConnection.apply(context, session, functionContext, flowFiles)) { try { // Execute the core function. diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java index 943e288..e1aeade 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -204,9 +204,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor { } } - private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> { + private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> { final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); - final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); + final Connection connection = dbcpService.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes()); fc.connectionUrl = dbcpService.getConnectionURL(); return connection; }; diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java index 7a5b389..7d137e7 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java @@ -205,7 +205,7 @@ public class PutHive3QL extends AbstractHive3QLProcessor { } } - private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> { + private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> { final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class); final Connection connection = dbcpService.getConnection(); fc.connectionUrl = dbcpService.getConnectionURL(); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java index d571789..d337f0d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java @@ -204,9 +204,9 @@ public class PutHive_1_1QL extends AbstractHive_1_1QLProcessor { } } - private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> { + private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> { final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class); - final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); + final Connection connection = dbcpService.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes()); fc.connectionUrl = dbcpService.getConnectionURL(); return connection; }; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 8b4ad78..55adabb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -358,9 +358,9 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { .build(); } - private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> { + private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> { final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class) - .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); + .getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 6a4e3a6..8c44f96 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -275,9 +275,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor { return poll.getFlowFiles(); }; - private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> { + private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> { final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class) - .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); + .getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); @@ -621,13 +621,18 @@ public class PutSQL extends AbstractSessionFactoryProcessor { boolean fragmentedTransaction = false; final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final FlowFileFilter dbcpServiceFlowFileFilter = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getFlowFileFilter(batchSize); List<FlowFile> flowFiles; if (useTransactions) { - final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(); + final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(dbcpServiceFlowFileFilter); flowFiles = session.get(filter); fragmentedTransaction = filter.isFragmentedTransaction(); } else { - flowFiles = session.get(batchSize); + if (dbcpServiceFlowFileFilter == null) { + flowFiles = session.get(batchSize); + } else { + flowFiles = session.get(dbcpServiceFlowFileFilter); + } } if (flowFiles.isEmpty()) { @@ -804,14 +809,28 @@ public class PutSQL extends AbstractSessionFactoryProcessor { * across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction */ static class TransactionalFlowFileFilter implements FlowFileFilter { + private final FlowFileFilter nonFragmentedTransactionFilter; private String selectedId = null; private int numSelected = 0; private boolean ignoreFragmentIdentifiers = false; + public TransactionalFlowFileFilter(FlowFileFilter nonFragmentedTransactionFilter) { + this.nonFragmentedTransactionFilter = nonFragmentedTransactionFilter; + } + public boolean isFragmentedTransaction() { return !ignoreFragmentIdentifiers; } + private FlowFileFilterResult filterNonFragmentedTransaction(final FlowFile flowFile) { + if (nonFragmentedTransactionFilter == null) { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + // Use non-fragmented tx filter for further filtering. + return nonFragmentedTransactionFilter.filter(flowFile); + } + } + @Override public FlowFileFilterResult filter(final FlowFile flowFile) { final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR); @@ -821,7 +840,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { // we accept any FlowFile that is also not part of a fragmented transaction. if (ignoreFragmentIdentifiers) { if (fragmentId == null || "1".equals(fragCount)) { - return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + return filterNonFragmentedTransaction(flowFile); } else { return FlowFileFilterResult.REJECT_AND_CONTINUE; } @@ -831,7 +850,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { if (selectedId == null) { // Only one FlowFile in the transaction. ignoreFragmentIdentifiers = true; - return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + return filterNonFragmentedTransaction(flowFile); } else { // we've already selected 1 FlowFile, and this one doesn't match. return FlowFileFilterResult.REJECT_AND_CONTINUE; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index b804447..5d155ee 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.processors.standard; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -38,6 +41,7 @@ import java.time.LocalTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -47,6 +51,7 @@ import javax.xml.bind.DatatypeConverter; import org.apache.commons.lang3.RandomUtils; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.reporting.InitializationException; @@ -1458,6 +1463,72 @@ public class TestPutSQL { } } + private Map<String, String> createFragmentedTransactionAttributes(String id, int count, int index) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("fragment.identifier", id); + attributes.put("fragment.count", String.valueOf(count)); + attributes.put("fragment.index", String.valueOf(index)); + return attributes; + } + + @Test + public void testTransactionalFlowFileFilter() { + final MockFlowFile ff0 = new MockFlowFile(0); + final MockFlowFile ff1 = new MockFlowFile(1); + final MockFlowFile ff2 = new MockFlowFile(2); + final MockFlowFile ff3 = new MockFlowFile(3); + final MockFlowFile ff4 = new MockFlowFile(4); + + ff0.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 0)); + ff1.putAttributes(Collections.singletonMap("accept", "false")); + ff2.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 1)); + ff3.putAttributes(Collections.singletonMap("accept", "true")); + ff4.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 2)); + + // TEST 1: Fragmented TX with null service filter + // Even if the controller service does not have filtering rule, tx filter should work. + FlowFileFilter txFilter = new PutSQL.TransactionalFlowFileFilter(null); + // Should perform a fragmented tx. + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1)); + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3)); + assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4)); + + // TEST 2: Non-Fragmented TX with null service filter + txFilter = new PutSQL.TransactionalFlowFileFilter(null); + // Should perform a non-fragmented tx. + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff1)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2)); + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4)); + + + final FlowFileFilter nonTxFilter = flowFile -> "true".equals(flowFile.getAttribute("accept")) + ? ACCEPT_AND_CONTINUE + : REJECT_AND_CONTINUE; + + // TEST 3: Fragmented TX with a service filter + // Even if the controller service does not have filtering rule, tx filter should work. + txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter); + // Should perform a fragmented tx. The nonTxFilter doesn't affect in this case. + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1)); + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3)); + assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4)); + + // TEST 4: Non-Fragmented TX with a service filter + txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter); + // Should perform a non-fragmented tx and use the nonTxFilter. + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2)); + assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3)); + assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4)); + } + /** * Simple implementation only for testing purposes */ diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java index ffc9b3a..3c88f83 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java @@ -18,12 +18,18 @@ package org.apache.nifi.dbcp; import java.sql.Connection; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; import org.apache.nifi.processor.exception.ProcessException; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE; + /** * Definition for Database Connection Pooling Service. * @@ -48,4 +54,45 @@ public interface DBCPService extends ControllerService { // without attributes return getConnection(); } + + /** + * Implementation classes should override this method to provide DBCPService specific FlowFile filtering rule. + * For example, when processing multiple incoming FlowFiles at the same time, every FlowFile should have the same attribute value. + * Components using this service and also accepting multiple incoming FlowFiles should use + * the FlowFileFilter returned by this method to get target FlowFiles from a process session. + * @return a FlowFileFilter or null if no service specific filtering is required + */ + default FlowFileFilter getFlowFileFilter() { + return null; + } + + /** + * An utility default method to composite DBCPService specific filtering provided by {@link #getFlowFileFilter()} and batch size limitation. + * Implementation classes do not have to override this method. Instead, override {@link #getFlowFileFilter()} to provide service specific filtering. + * Components using this service and also accepting multiple incoming FlowFiles should use + * the FlowFileFilter returned by this method to get target FlowFiles from a process session. + * @param batchSize the maximum number of FlowFiles to accept + * @return a composited FlowFileFilter having service specific filtering and batch size limitation, or null if no service specific filtering is required. + */ + default FlowFileFilter getFlowFileFilter(int batchSize) { + final FlowFileFilter filter = getFlowFileFilter(); + if (filter == null) { + return null; + } + + final AtomicInteger count = new AtomicInteger(0); + return flowFile -> { + if (count.get() >= batchSize) { + return REJECT_AND_TERMINATE; + } + + final FlowFileFilterResult result = filter.filter(flowFile); + if (ACCEPT_AND_CONTINUE.equals(result)) { + count.incrementAndGet(); + return ACCEPT_AND_CONTINUE; + } else { + return result; + } + }; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java index b0fb964..978c912 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java @@ -16,16 +16,21 @@ */ package org.apache.nifi.dbcp; - +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup; import java.sql.Connection; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE; @Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" }) @CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " + @@ -61,4 +66,17 @@ public class DBCPConnectionPoolLookup public Connection getConnection(Map<String, String> attributes) { return lookupService(attributes).getConnection(attributes); } + + @Override + public FlowFileFilter getFlowFileFilter() { + final AtomicReference<String> ref = new AtomicReference<>(); + return flowFile -> { + final String flowFileDBName = flowFile.getAttribute(DATABASE_NAME_ATTRIBUTE); + if (StringUtils.isEmpty(flowFileDBName)) { + throw new ProcessException("FlowFile attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'"); + } + final String databaseName = ref.compareAndSet(null, flowFileDBName) ? flowFileDBName : ref.get(); + return flowFileDBName.equals(databaseName) ? ACCEPT_AND_CONTINUE : REJECT_AND_CONTINUE; + }; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java index d02437f..978907e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java @@ -17,14 +17,17 @@ package org.apache.nifi.dbcp; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; import java.sql.Connection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -144,6 +147,54 @@ public class TestDBCPConnectionPoolLookup { runner.assertNotValid(dbcpLookupService); } + @Test + public void testFlowFileFiltering() { + final FlowFileFilter filter = dbcpLookupService.getFlowFileFilter(); + assertNotNull(filter); + + final MockFlowFile ff0 = new MockFlowFile(0); + final MockFlowFile ff1 = new MockFlowFile(1); + final MockFlowFile ff2 = new MockFlowFile(2); + final MockFlowFile ff3 = new MockFlowFile(3); + final MockFlowFile ff4 = new MockFlowFile(4); + + ff0.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A")); + ff1.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B")); + ff2.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A")); + ff3.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B")); + ff4.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A")); + + assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff0)); + assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff1)); + assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff2)); + assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff3)); + assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff4)); + } + + @Test + public void testFlowFileFilteringWithBatchSize() { + final FlowFileFilter filter = dbcpLookupService.getFlowFileFilter(2); + assertNotNull(filter); + + final MockFlowFile ff0 = new MockFlowFile(0); + final MockFlowFile ff1 = new MockFlowFile(1); + final MockFlowFile ff2 = new MockFlowFile(2); + final MockFlowFile ff3 = new MockFlowFile(3); + final MockFlowFile ff4 = new MockFlowFile(4); + + ff0.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A")); + ff1.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B")); + ff2.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A")); + ff3.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B")); + ff4.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A")); + + assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff0)); + assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff1)); + assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff2)); + assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE, filter.filter(ff3)); + assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE, filter.filter(ff4)); + } + /** * A mock DBCPService that will always return the passed in MockConnection. */