This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new c473e6fa0c NIFI-11221 Removed client configuration from AbstractMongoProcessor. c473e6fa0c is described below commit c473e6fa0cf38ef4469a9c6656811eb7c8ae5591 Author: Mike Thomsen <mthom...@apache.org> AuthorDate: Sun Apr 30 09:09:06 2023 -0400 NIFI-11221 Removed client configuration from AbstractMongoProcessor. NIFI-11221 Style check fixes NIFI-11221 Removed dead code. Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #7213 --- .../apache/nifi/mongodb/MongoDBClientService.java | 5 +- .../processors/mongodb/AbstractMongoProcessor.java | 163 ++------------------- .../nifi/processors/mongodb/DeleteMongo.java | 3 +- .../apache/nifi/processors/mongodb/GetMongo.java | 2 - .../apache/nifi/processors/mongodb/PutMongo.java | 45 +----- .../nifi/processors/mongodb/PutMongoRecord.java | 8 +- .../processors/mongodb/RunMongoAggregation.java | 2 - .../mongodb/AbstractMongoProcessorTest.java | 87 ----------- .../nifi/processors/mongodb/DeleteMongoIT.java | 28 +--- .../apache/nifi/processors/mongodb/GetMongoIT.java | 21 ++- .../processors/mongodb/MongoWriteTestBase.java | 12 +- .../apache/nifi/processors/mongodb/PutMongoIT.java | 57 ++----- .../nifi/processors/mongodb/PutMongoRecordIT.java | 35 +---- .../nifi/processors/mongodb/PutMongoTest.java | 1 - .../processors/mongodb/RunMongoAggregationIT.java | 32 +--- .../nifi/mongodb/MongoDBControllerService.java | 7 +- 16 files changed, 79 insertions(+), 429 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java index 9f8893b46d..ee9f7ed08c 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java @@ -21,7 +21,6 @@ import com.mongodb.WriteConcern; import com.mongodb.client.MongoDatabase; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.VerifiableControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -123,14 +122,14 @@ public interface MongoDBClientService extends ControllerService, VerifiableContr .allowableValues(WRITE_CONCERN_ACKNOWLEDGED_VALUE, WRITE_CONCERN_UNACKNOWLEDGED_VALUE, WRITE_CONCERN_FSYNCED_VALUE, WRITE_CONCERN_JOURNALED_VALUE, WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE, WRITE_CONCERN_MAJORITY_VALUE, WRITE_CONCERN_W1_VALUE, WRITE_CONCERN_W2_VALUE, WRITE_CONCERN_W3_VALUE) - .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) + .defaultValue(WRITE_CONCERN_ACKNOWLEDGED_VALUE.getValue()) .build(); default Document convertJson(String query) { return Document.parse(query); } - WriteConcern getWriteConcern(final ConfigurationContext context); MongoDatabase getDatabase(String name); String getURI(); + WriteConcern getWriteConcern(); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index dbf383ccd9..95c63bff2c 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -22,27 +22,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; -import com.mongodb.WriteConcern; -import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import java.io.ByteArrayInputStream; -import java.io.UnsupportedEncodingException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -53,10 +39,19 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.security.util.ClientAuth; -import org.apache.nifi.ssl.SSLContextService; import org.bson.Document; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; + public abstract class AbstractMongoProcessor extends AbstractProcessor { static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED"; @@ -83,15 +78,6 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .identifiesControllerService(MongoDBClientService.class) .build(); - static final PropertyDescriptor URI = new PropertyDescriptor.Builder() - .name("Mongo URI") - .displayName("Mongo URI") - .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") .displayName("Mongo Database Name") @@ -121,36 +107,6 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(true) .build(); - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("ssl-context-service") - .displayName("SSL Context Service") - .description("The SSL Context Service used to provide client certificate information for TLS/SSL " - + "connections.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() - .name("ssl-client-auth") - .displayName("Client Auth") - .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " - + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " - + "has been defined and enabled.") - .required(false) - .allowableValues(ClientAuth.values()) - .defaultValue("REQUIRED") - .build(); - - public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() - .name("Write Concern") - .displayName("Write Concern") - .description("The write concern to use") - .required(true) - .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, - WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY, WRITE_CONCERN_W1, WRITE_CONCERN_W2, WRITE_CONCERN_W3) - .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) - .build(); - static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() .name("results-per-flowfile") .displayName("Results Per FlowFile") @@ -216,11 +172,8 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { static { List<PropertyDescriptor> _temp = new ArrayList<>(); _temp.add(CLIENT_SERVICE); - _temp.add(URI); _temp.add(DATABASE_NAME); _temp.add(COLLECTION_NAME); - _temp.add(SSL_CONTEXT_SERVICE); - _temp.add(CLIENT_AUTH); descriptors = Collections.unmodifiableList(_temp); } @@ -230,36 +183,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { @OnScheduled public final void createClient(ProcessContext context) { - if (context.getProperty(CLIENT_SERVICE).isSet()) { - clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); - return; - } - - if (mongoClient != null) { - closeClient(); - } - - getLogger().info("Creating MongoClient"); - - // Set up the client for secure (SSL/TLS communications) if configured to do so - final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SSLContext sslContext; - - if (sslService != null) { - sslContext = sslService.createContext(); - } else { - sslContext = null; - } - - try { - final String uri = getURI(context); - final MongoClientSettings.Builder builder = getClientSettings(uri, sslContext); - final MongoClientSettings clientSettings = builder.build(); - mongoClient = MongoClients.create(clientSettings); - } catch (Exception e) { - getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); - throw e; - } + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); } protected MongoClientSettings.Builder getClientSettings(final String uri, final SSLContext sslContext) { @@ -297,50 +221,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { } protected String getURI(final ProcessContext context) { - if (clientService != null) { - return clientService.getURI(); - } else { - return context.getProperty(URI).evaluateAttributeExpressions().getValue(); - } - } - - protected WriteConcern getWriteConcern(final ProcessContext context) { - final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); - WriteConcern writeConcern = null; - switch (writeConcernProperty) { - case WRITE_CONCERN_ACKNOWLEDGED: - writeConcern = WriteConcern.ACKNOWLEDGED; - break; - case WRITE_CONCERN_UNACKNOWLEDGED: - writeConcern = WriteConcern.UNACKNOWLEDGED; - break; - case WRITE_CONCERN_FSYNCED: - writeConcern = WriteConcern.JOURNALED; - getLogger().warn("Using deprecated write concern FSYNCED"); - break; - case WRITE_CONCERN_JOURNALED: - writeConcern = WriteConcern.JOURNALED; - break; - case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: - writeConcern = WriteConcern.W2; - getLogger().warn("Using deprecated write concern REPLICA_ACKNOWLEDGED"); - break; - case WRITE_CONCERN_MAJORITY: - writeConcern = WriteConcern.MAJORITY; - break; - case WRITE_CONCERN_W1: - writeConcern = WriteConcern.W1; - break; - case WRITE_CONCERN_W2: - writeConcern = WriteConcern.W2; - break; - case WRITE_CONCERN_W3: - writeConcern = WriteConcern.W3; - break; - default: - writeConcern = WriteConcern.ACKNOWLEDGED; - } - return writeConcern; + return clientService.getURI(); } protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, @@ -365,22 +246,4 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { objectMapper.setDateFormat(df); } } - - @Override - protected Collection<ValidationResult> customValidate(ValidationContext context) { - List<ValidationResult> retVal = new ArrayList<>(); - - boolean clientIsSet = context.getProperty(CLIENT_SERVICE).isSet(); - boolean uriIsSet = context.getProperty(URI).isSet(); - - if (clientIsSet && uriIsSet) { - String msg = "The client service and URI fields cannot be set at the same time."; - retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); - } else if (!clientIsSet && !uriIsSet) { - String msg = "The client service or the URI field must be set."; - retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); - } - - return retVal; - } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java index 6176a66091..e0d7c5e829 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/DeleteMongo.java @@ -98,7 +98,6 @@ public class DeleteMongo extends AbstractMongoProcessor { _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(DELETE_MODE); _propertyDescriptors.add(FAIL_ON_NO_DELETE); - _propertyDescriptors.add(WRITE_CONCERN); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); final Set<Relationship> _relationships = new HashSet<>(); @@ -127,7 +126,7 @@ public class DeleteMongo extends AbstractMongoProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); - final WriteConcern writeConcern = getWriteConcern(context); + final WriteConcern writeConcern = clientService.getWriteConcern(); final String deleteMode = context.getProperty(DELETE_MODE).getValue(); final String deleteAttr = flowFile.getAttribute("mongodb.delete.mode"); final Boolean failMode = context.getProperty(FAIL_ON_NO_DELETE).asBoolean(); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 1f149dd12f..60eed117a6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -104,8 +104,6 @@ public class GetMongo extends AbstractMongoQueryProcessor { _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); _propertyDescriptors.add(DATE_FORMAT); - _propertyDescriptors.add(SSL_CONTEXT_SERVICE); - _propertyDescriptors.add(CLIENT_AUTH); _propertyDescriptors.add(SEND_EMPTY_RESULTS); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 57081d751f..548ad6689d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -19,13 +19,13 @@ package org.apache.nifi.processors.mongodb; import com.mongodb.BasicDBObject; import com.mongodb.WriteConcern; import com.mongodb.client.MongoCollection; -import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.UpdateOptions; import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; @@ -136,7 +136,6 @@ public class PutMongo extends AbstractMongoProcessor { _propertyDescriptors.add(UPDATE_QUERY_KEY); _propertyDescriptors.add(UPDATE_QUERY); _propertyDescriptors.add(UPDATE_MODE); - _propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(CHARACTER_SET); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -196,7 +195,7 @@ public class PutMongo extends AbstractMongoProcessor { final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final String mode = context.getProperty(MODE).getValue(); final String updateMode = context.getProperty(UPDATE_MODE).getValue(); - final WriteConcern writeConcern = getWriteConcern(context); + final WriteConcern writeConcern = clientService.getWriteConcern(); try { final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern); @@ -275,42 +274,4 @@ public class PutMongo extends AbstractMongoProcessor { return retVal; } - - - protected WriteConcern getWriteConcern(final ProcessContext context) { - final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); - WriteConcern writeConcern = null; - switch (writeConcernProperty) { - case WRITE_CONCERN_ACKNOWLEDGED: - writeConcern = WriteConcern.ACKNOWLEDGED; - break; - case WRITE_CONCERN_UNACKNOWLEDGED: - writeConcern = WriteConcern.UNACKNOWLEDGED; - break; - case WRITE_CONCERN_FSYNCED: - writeConcern = WriteConcern.JOURNALED; - break; - case WRITE_CONCERN_JOURNALED: - writeConcern = WriteConcern.JOURNALED; - break; - case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: - writeConcern = WriteConcern.W2; - break; - case WRITE_CONCERN_MAJORITY: - writeConcern = WriteConcern.MAJORITY; - break; - case WRITE_CONCERN_W1: - writeConcern = WriteConcern.W1; - break; - case WRITE_CONCERN_W2: - writeConcern = WriteConcern.W2; - break; - case WRITE_CONCERN_W3: - writeConcern = WriteConcern.W3; - break; - default: - writeConcern = WriteConcern.ACKNOWLEDGED; - } - return writeConcern; - } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java index 9c3df044f6..ecbeb76e27 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java @@ -149,7 +149,6 @@ public class PutMongoRecord extends AbstractMongoProcessor { static { List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); - _propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(RECORD_READER_FACTORY); _propertyDescriptors.add(INSERT_COUNT); _propertyDescriptors.add(ORDERED); @@ -184,7 +183,7 @@ public class PutMongoRecord extends AbstractMongoProcessor { final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) .asControllerService(RecordReaderFactory.class); - final WriteConcern writeConcern = getWriteConcern(context); + final WriteConcern writeConcern = clientService.getWriteConcern(); int ceiling = context.getProperty(INSERT_COUNT).asInteger(); int written = 0; @@ -266,10 +265,7 @@ public class PutMongoRecord extends AbstractMongoProcessor { error = true; } finally { if (!error) { - String url = clientService != null - ? clientService.getURI() - : context.getProperty(URI).evaluateAttributeExpressions().getValue(); - session.getProvenanceReporter().send(flowFile, url, String.format("Written %d documents to MongoDB.", written)); + session.getProvenanceReporter().send(flowFile, clientService.getURI(), String.format("Written %d documents to MongoDB.", written)); session.transfer(flowFile, REL_SUCCESS); getLogger().info("Written {} records into MongoDB", new Object[]{ written }); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index 3c4ddef098..83775c59bc 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -115,8 +115,6 @@ public class RunMongoAggregation extends AbstractMongoProcessor { _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); _propertyDescriptors.add(DATE_FORMAT); - _propertyDescriptors.add(SSL_CONTEXT_SERVICE); - _propertyDescriptors.add(CLIENT_AUTH); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); final Set<Relationship> _relationships = new HashSet<>(); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java deleted file mode 100644 index 75933a7d92..0000000000 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.mongodb; - -import com.mongodb.MongoClientSettings; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import javax.net.ssl.SSLContext; - -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class AbstractMongoProcessorTest { - - MockAbstractMongoProcessor processor; - private TestRunner testRunner; - - @BeforeEach - public void setUp() { - processor = new MockAbstractMongoProcessor(); - testRunner = TestRunners.newTestRunner(processor); - } - - @Test - public void testCreateClientWithSSL() throws Exception { - SSLContextService sslService = mock(SSLContextService.class); - SSLContext sslContext = mock(SSLContext.class); - when(sslService.getIdentifier()).thenReturn("ssl-context"); - when(sslService.createContext()).thenReturn(sslContext); - testRunner.addControllerService("ssl-context", sslService); - testRunner.enableControllerService(sslService); - testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017"); - testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context"); - testRunner.assertValid(sslService); - processor.createClient(testRunner.getProcessContext()); - assertNotNull(processor.mongoClient); - processor.mongoClient = null; - processor.createClient(testRunner.getProcessContext()); - assertNotNull(processor.mongoClient); - } - - /** - * Provides a stubbed processor instance for testing - */ - public static class MockAbstractMongoProcessor extends AbstractMongoProcessor { - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - // nothing to do - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @Override - protected MongoClientSettings.Builder getClientSettings(final String uri, final SSLContext sslContext) { - return MongoClientSettings.builder(); - } - } - -} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java index ab79a599ee..66be5fb9d8 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java @@ -19,8 +19,6 @@ package org.apache.nifi.processors.mongodb; -import org.apache.nifi.mongodb.MongoDBClientService; -import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.util.TestRunner; import org.bson.Document; import org.junit.jupiter.api.AfterEach; @@ -55,7 +53,7 @@ public class DeleteMongoIT extends MongoWriteTestBase { } @Test - public void testDeleteOne() { + public void testDeleteOne() throws Exception { TestRunner runner = init(DeleteMongo.class); String query = "{ \"_id\": \"doc_1\" }"; runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_ONE); @@ -79,7 +77,7 @@ public class DeleteMongoIT extends MongoWriteTestBase { } @Test - public void testDeleteMany() { + public void testDeleteMany() throws Exception { TestRunner runner = init(DeleteMongo.class); String query = "{\n" + "\t\"_id\": {\n" + @@ -99,7 +97,7 @@ public class DeleteMongoIT extends MongoWriteTestBase { } @Test - public void testFailOnNoDeleteOptions() { + public void testFailOnNoDeleteOptions() throws Exception { TestRunner runner = init(DeleteMongo.class); String query = "{ \"_id\": \"doc_4\"} "; runner.enqueue(query); @@ -120,24 +118,4 @@ public class DeleteMongoIT extends MongoWriteTestBase { assertEquals(3, collection.countDocuments(Document.parse("{}")), "A document was deleted"); } - - @Test - public void testClientService() throws Exception { - MongoDBClientService clientService = new MongoDBControllerService(); - TestRunner runner = init(DeleteMongo.class); - runner.addControllerService("clientService", clientService); - runner.removeProperty(DeleteMongo.URI); - runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY); - runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); - runner.setProperty(DeleteMongo.CLIENT_SERVICE, "clientService"); - runner.enableControllerService(clientService); - runner.assertValid(); - - runner.enqueue("{}"); - runner.run(); - runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1); - runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0); - - assertEquals(0, collection.countDocuments()); - } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java index 3ffa90d4fd..506a9af566 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java @@ -29,6 +29,7 @@ import org.apache.nifi.mongodb.MongoDBClientService; import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; @@ -73,14 +74,19 @@ public class GetMongoIT extends AbstractMongoIT { private TestRunner runner; private MongoClient mongoClient; + private MongoDBControllerService clientService; @BeforeEach - public void setup() { + public void setup() throws Exception { runner = TestRunners.newTestRunner(GetMongo.class); runner.setVariable("uri", MONGO_CONTAINER.getConnectionString()); runner.setVariable("db", DB_NAME); runner.setVariable("collection", COLLECTION_NAME); - runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + clientService = new MongoDBControllerService(); + runner.addControllerService("clientService", clientService); + runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); + runner.setProperty(GetMongo.CLIENT_SERVICE, "clientService"); + runner.enableControllerService(clientService); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); @@ -119,7 +125,6 @@ public class GetMongoIT extends AbstractMongoIT { assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); // missing query - is ok - runner.setProperty(AbstractMongoProcessor.URI, MONGO_CONTAINER.getConnectionString()); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); runner.enqueue(new byte[0]); @@ -484,7 +489,7 @@ public class GetMongoIT extends AbstractMongoIT { * it can work against a flowfile. */ @Test - public void testDatabaseEL() { + public void testDatabaseEL() throws InitializationException { runner.clearTransferState(); runner.removeVariable("collection"); runner.removeVariable("db"); @@ -529,7 +534,12 @@ public class GetMongoIT extends AbstractMongoIT { for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) { // Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile tmpRunner = TestRunners.newTestRunner(GetMongo.class); - tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_CONTAINER.getConnectionString()); + MongoDBControllerService tempService = new MongoDBControllerService(); + tmpRunner.addControllerService("clientService", tempService); + tmpRunner.setProperty(tempService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); + tmpRunner.setProperty(AbstractMongoProcessor.CLIENT_SERVICE, "clientService"); + tmpRunner.enableControllerService(tempService); + tmpRunner.setProperty(AbstractMongoProcessor.CLIENT_SERVICE, "clientService"); tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); tmpRunner.setIncomingConnection(true); @@ -590,7 +600,6 @@ public class GetMongoIT extends AbstractMongoIT { public void testClientService() throws Exception { MongoDBClientService clientService = new MongoDBControllerService(); runner.addControllerService("clientService", clientService); - runner.removeProperty(GetMongo.URI); runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); runner.setProperty(GetMongo.CLIENT_SERVICE, "clientService"); runner.enableControllerService(clientService); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java index 6b114178c9..d6cfe51a65 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/MongoWriteTestBase.java @@ -20,6 +20,8 @@ package org.apache.nifi.processors.mongodb; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.bson.Document; @@ -41,19 +43,25 @@ public class MongoWriteTestBase extends AbstractMongoIT { protected MongoClient mongoClient; protected MongoCollection<Document> collection; + protected MongoDBClientService clientService; public void setup(Class processor) { DATABASE_NAME = processor.getSimpleName().toLowerCase(); mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString()); collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME); + + clientService = new MongoDBControllerService(); } - public TestRunner init(Class processor) { + public TestRunner init(Class processor) throws Exception { TestRunner runner = TestRunners.newTestRunner(processor); + runner.addControllerService("clientService", clientService); + runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); + runner.setProperty(AbstractMongoProcessor.CLIENT_SERVICE, "clientService"); + runner.enableControllerService(clientService); runner.setVariable("uri", MONGO_CONTAINER.getConnectionString()); runner.setVariable("db", DATABASE_NAME); runner.setVariable("collection", COLLECTION_NAME); - runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); return runner; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java index be0271fcd8..35c5a0403d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java @@ -18,13 +18,10 @@ package org.apache.nifi.processors.mongodb; import com.mongodb.client.MongoCursor; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.mongodb.MongoDBClientService; -import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; import org.bson.Document; import org.bson.types.ObjectId; import org.junit.jupiter.api.AfterEach; @@ -60,13 +57,15 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(PutMongo.class); + public void testValidators() throws Exception { + TestRunner runner = init(PutMongo.class); Collection<ValidationResult> results; ProcessContext pc; // missing uri, db, collection runner.enqueue(new byte[0]); + runner.removeProperty(PutMongo.DATABASE_NAME); + runner.removeProperty(PutMongo.COLLECTION_NAME); pc = runner.getProcessContext(); results = new HashSet<>(); if (pc instanceof MockProcessContext) { @@ -77,23 +76,10 @@ public class PutMongoIT extends MongoWriteTestBase { assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required")); assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); - // invalid write concern - runner.setProperty(AbstractMongoProcessor.URI, MONGO_CONTAINER.getConnectionString()); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); - runner.setProperty(PutMongo.WRITE_CONCERN, "xyz"); runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - results = new HashSet<>(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - assertEquals(1, results.size()); - assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*")); - // valid write concern - runner.setProperty(PutMongo.WRITE_CONCERN, PutMongo.WRITE_CONCERN_UNACKNOWLEDGED); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); results = new HashSet<>(); @@ -104,7 +90,7 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testQueryAndUpdateKey() { + public void testQueryAndUpdateKey() throws Exception { TestRunner runner = init(PutMongo.class); runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); @@ -113,7 +99,7 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testNoQueryAndNoUpdateKey() { + public void testNoQueryAndNoUpdateKey() throws Exception { TestRunner runner = init(PutMongo.class); runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); runner.removeProperty(PutMongo.UPDATE_QUERY); @@ -122,14 +108,14 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testBlankUpdateKey() { + public void testBlankUpdateKey() throws Exception { TestRunner runner = init(PutMongo.class); runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " "); runner.assertNotValid(); } @Test - public void testUpdateQuery() { + public void testUpdateQuery() throws Exception { TestRunner runner = init(PutMongo.class); Document document = new Document() .append("name", "John Smith") @@ -155,7 +141,7 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testUpdateBySimpleKey() { + public void testUpdateBySimpleKey() throws Exception { TestRunner runner = init(PutMongo.class); Document document = new Document() .append("name", "John Smith") @@ -181,14 +167,14 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testUpdateWithFullDocByKeys() { + public void testUpdateWithFullDocByKeys() throws Exception { TestRunner runner = init(PutMongo.class); runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name,department"); testUpdateFullDocument(runner); } @Test - public void testUpdateWithFullDocByQuery() { + public void testUpdateWithFullDocByQuery() throws Exception { TestRunner runner = init(PutMongo.class); String query = "{ \"name\": \"John Smith\"}"; runner.setProperty(PutMongo.UPDATE_QUERY, query); @@ -230,7 +216,7 @@ public class PutMongoIT extends MongoWriteTestBase { } @Test - public void testUpdateByComplexKey() { + public void testUpdateByComplexKey() throws Exception { TestRunner runner = init(PutMongo.class); Document document = new Document() .append("name", "John Smith") @@ -489,7 +475,7 @@ public class PutMongoIT extends MongoWriteTestBase { * */ @Test - public void testNiFi_4759_Regressions() { + public void testNiFi_4759_Regressions() throws Exception { TestRunner runner = init(PutMongo.class); String[] upserts = new String[]{ "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }", @@ -525,21 +511,4 @@ public class PutMongoIT extends MongoWriteTestBase { index++; } } - - @Test - public void testClientService() throws Exception { - MongoDBClientService clientService = new MongoDBControllerService(); - TestRunner runner = init(PutMongo.class); - runner.addControllerService("clientService", clientService); - runner.removeProperty(PutMongo.URI); - runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); - runner.setProperty(PutMongo.CLIENT_SERVICE, "clientService"); - runner.enableControllerService(clientService); - runner.assertValid(); - - runner.enqueue("{ \"msg\": \"Hello, world\" }"); - runner.run(); - runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); - runner.assertTransferCount(PutMongo.REL_FAILURE, 0); - } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java index de6d0d2d25..7353908d91 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java @@ -24,7 +24,6 @@ import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.mongodb.MongoDBClientService; import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; @@ -36,7 +35,6 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; import org.bson.Document; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -72,7 +70,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase { super.teardown(); } - private TestRunner init() throws InitializationException { + private TestRunner init() throws Exception { TestRunner runner = init(PutMongoRecord.class); runner.addControllerService("reader", recordReader); runner.enableControllerService(recordReader); @@ -82,12 +80,15 @@ public class PutMongoRecordIT extends MongoWriteTestBase { @Test public void testValidators() throws Exception { - TestRunner runner = TestRunners.newTestRunner(PutMongoRecord.class); + TestRunner runner = init(PutMongoRecord.class); runner.addControllerService("reader", recordReader); runner.enableControllerService(recordReader); Collection<ValidationResult> results; ProcessContext pc; + runner.removeProperty(PutMongoRecord.DATABASE_NAME); + runner.removeProperty(PutMongoRecord.COLLECTION_NAME); + // missing uri, db, collection, RecordReader runner.enqueue(new byte[0]); pc = runner.getProcessContext(); @@ -100,31 +101,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase { assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required")); assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); assertTrue(it.next().toString().contains("is invalid because Record Reader is required")); - - // invalid write concern - runner.setProperty(AbstractMongoProcessor.URI, MONGO_CONTAINER.getConnectionString()); - runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); - runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); - runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader"); - runner.setProperty(PutMongoRecord.WRITE_CONCERN, "xyz"); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - results = new HashSet<>(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - assertEquals(1, results.size()); - assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*")); - - // valid write concern - runner.setProperty(PutMongoRecord.WRITE_CONCERN, PutMongoRecord.WRITE_CONCERN_UNACKNOWLEDGED); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - results = new HashSet<>(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - assertEquals(0, results.size()); } @Test @@ -157,7 +133,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase { */ MongoDBClientService clientService = new MongoDBControllerService(); runner.addControllerService("clientService", clientService); - runner.removeProperty(PutMongoRecord.URI); runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); runner.setProperty(PutMongoRecord.CLIENT_SERVICE, "clientService"); runner.enableControllerService(clientService); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java index 7e5b460868..2059ac380e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -38,7 +38,6 @@ public class PutMongoTest { @Test public void testQueryKeyValidation() { TestRunner runner = TestRunners.newTestRunner(PutMongo.class); - runner.setProperty(PutMongo.URI, "mongodb://localhost:27017"); runner.setProperty(PutMongo.DATABASE_NAME, "demo"); runner.setProperty(PutMongo.COLLECTION_NAME, "messages"); runner.setProperty(PutMongo.MODE, PutMongo.MODE_INSERT); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java index 5def7542a8..c9f91f4887 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; -import org.apache.nifi.mongodb.MongoDBClientService; import org.apache.nifi.mongodb.MongoDBControllerService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -55,14 +54,19 @@ public class RunMongoAggregationIT extends AbstractMongoIT { private MongoClient mongoClient; private Map<String, Integer> mappings; private Calendar now = Calendar.getInstance(); + private MongoDBControllerService clientService; @BeforeEach - public void setup() { + public void setup() throws Exception { runner = TestRunners.newTestRunner(RunMongoAggregation.class); runner.setVariable("uri", MONGO_CONTAINER.getConnectionString()); runner.setVariable("db", DB_NAME); runner.setVariable("collection", COLLECTION_NAME); - runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + clientService = new MongoDBControllerService(); + runner.addControllerService("clientService", clientService); + runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); + runner.setProperty(AbstractMongoProcessor.CLIENT_SERVICE, "clientService"); + runner.enableControllerService(clientService); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(RunMongoAggregation.QUERY_ATTRIBUTE, AGG_ATTR); @@ -218,28 +222,6 @@ public class RunMongoAggregationIT extends AbstractMongoIT { } } - @Test - public void testClientService() throws Exception { - MongoDBClientService clientService = new MongoDBControllerService(); - runner.addControllerService("clientService", clientService); - runner.removeProperty(RunMongoAggregation.URI); - runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString()); - runner.setProperty(RunMongoAggregation.CLIENT_SERVICE, "clientService"); - runner.setProperty(RunMongoAggregation.QUERY, "[\n" + - " {\n" + - " \"$project\": {\n" + - " \"_id\": 0,\n" + - " \"val\": 1\n" + - " }\n" + - " }]"); - runner.enableControllerService(clientService); - runner.assertValid(); - - runner.enqueue("{}"); - runner.run(); - runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 9); - } - @Test public void testExtendedJsonSupport() throws Exception { String pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'"; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java index cefc6b8dd4..dbcc4988f6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java @@ -66,9 +66,11 @@ public class MongoDBControllerService extends AbstractControllerService implemen descriptors.add(DB_PASSWORD); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(CLIENT_AUTH); + descriptors.add(WRITE_CONCERN); } protected MongoClient mongoClient; + private String writeConcernProperty; // TODO: Remove duplicate code by refactoring shared method to accept PropertyContext protected MongoClient createClient(ConfigurationContext context, MongoClient existing) { @@ -78,6 +80,8 @@ public class MongoDBControllerService extends AbstractControllerService implemen getLogger().info("Creating MongoClient"); + writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); + // Set up the client for secure (SSL/TLS communications) if configured to do so final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContext sslContext; @@ -138,8 +142,7 @@ public class MongoDBControllerService extends AbstractControllerService implemen } @Override - public WriteConcern getWriteConcern(final ConfigurationContext context) { - final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); + public WriteConcern getWriteConcern() { WriteConcern writeConcern = null; switch (writeConcernProperty) { case WRITE_CONCERN_ACKNOWLEDGED: