exceptionfactory commented on code in PR #8839: URL: https://github.com/apache/nifi/pull/8839#discussion_r1605415077
########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COLLECTION_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; + +public class AbstractCouchbaseLookupService extends AbstractControllerService { + + protected static final String KEY = "key"; + protected static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet())); Review Comment: It looks like this can be simplified to `Set.of(KEY)` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.couchbase.client.core.error.CouchbaseException; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", + description = "Specify bucket password if necessary." + + " Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder() + .name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor + .Builder() + .name("user-name") + .displayName("User Name") + .description("The user name to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor + .Builder() + .name("user-password") + .displayName("User Password") + .description("The user password to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) Review Comment: This should be removed following general conventions against environment-based sensitive properties. ```suggestion ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.LookupInSpec; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.StringLookupService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.LOOKUP_SUB_DOC_PATH; + +@Tags({"lookup", "enrich", "key", "value", "couchbase"}) +@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key." + + " The coordinates that are passed to the lookup must contain the key 'key'.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseKeyValueLookupService extends AbstractCouchbaseLookupService implements StringLookupService { + + private volatile String subDocPath; + + @Override + protected void addProperties(List<PropertyDescriptor> properties) { + properties.add(LOOKUP_SUB_DOC_PATH); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + super.onEnabled(context); + subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue(); + } + + @Override + public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException { + + try { + final Bucket bucket = couchbaseClusterService.openBucket(bucketName); + final Collection collection = bucket.collection(collectionName); + final Optional<String> docId = Optional.ofNullable(coordinates.get(KEY)).map(Object::toString); + + if (!StringUtils.isBlank(subDocPath)) { + return docId.map(key -> { + try { + return collection.lookupIn(key, Collections.singletonList(LookupInSpec.get(subDocPath))); + } catch (DocumentNotFoundException e) { + getLogger().debug("Document was not found for {}", new Object[]{key}); Review Comment: ```suggestion getLogger().debug("Document was not found for {}", key); ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.couchbase.client.core.error.CouchbaseException; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", + description = "Specify bucket password if necessary." + + " Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder() + .name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor + .Builder() + .name("user-name") + .displayName("User Name") + .description("The user name to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor + .Builder() + .name("user-password") + .displayName("User Password") + .description("The user password to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(CONNECTION_STRING); + props.add(USER_NAME); + props.add(USER_PASSWORD); + + properties = Collections.unmodifiableList(props); + } + + private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for "; + + private Map<String, String> bucketPasswords; + private volatile Cluster cluster; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( + String propertyDescriptorName) { + if (propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)) { + return new PropertyDescriptor + .Builder().name(propertyDescriptorName) + .description("Bucket password.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) Review Comment: ```suggestion ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.couchbase.client.core.error.CouchbaseException; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", + description = "Specify bucket password if necessary." + + " Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder() + .name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor + .Builder() + .name("user-name") + .displayName("User Name") + .description("The user name to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor + .Builder() + .name("user-password") + .displayName("User Password") + .description("The user password to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(CONNECTION_STRING); + props.add(USER_NAME); + props.add(USER_PASSWORD); + + properties = Collections.unmodifiableList(props); + } + + private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for "; + + private Map<String, String> bucketPasswords; + private volatile Cluster cluster; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( + String propertyDescriptorName) { + if (propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)) { + return new PropertyDescriptor + .Builder().name(propertyDescriptorName) + .description("Bucket password.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + } + return null; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + final Collection<ValidationResult> results = new ArrayList<>(); + + final boolean isUserNameSet = context.getProperty(USER_NAME).isSet(); + final boolean isUserPasswordSet = context.getProperty(USER_PASSWORD).isSet(); + if ((isUserNameSet && !isUserPasswordSet) || (!isUserNameSet && isUserPasswordSet)) { + results.add(new ValidationResult.Builder() + .subject("User Name and Password") + .explanation("Both User Name and Password are required to use.") + .build()); + } Review Comment: This could be removed and replaced with an `Authentication Strategy` property, where Username and Password are both required with a selected strategy. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.core.error.DecodingFailureException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.json.JsonArray; +import com.couchbase.client.java.json.JsonObject; +import com.couchbase.client.java.kv.GetResult; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCouchbaseUtils { Review Comment: This class should be removed, or changed to use Testcontainers. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java: ########## @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import com.couchbase.client.core.Core; +import com.couchbase.client.core.CoreContext; +import com.couchbase.client.core.cnc.tracing.NoopRequestSpan; +import com.couchbase.client.core.env.CoreEnvironment; +import com.couchbase.client.core.env.PasswordAuthenticator; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DurabilityImpossibleException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.context.KeyValueErrorContext; +import com.couchbase.client.core.io.CollectionIdentifier; +import com.couchbase.client.core.io.netty.kv.MemcacheProtocol; +import com.couchbase.client.core.msg.ResponseStatus; +import com.couchbase.client.core.msg.kv.GetRequest; +import com.couchbase.client.core.retry.BestEffortRetryStrategy; +import com.couchbase.client.core.util.ConnectionString; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.couchbase.DocumentType; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.couchbase.CouchbaseAttributes.Exception; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestPutCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + private final CoreEnvironment coreEnvironment = CoreEnvironment.builder().build(); + private final PasswordAuthenticator passwordAuthenticator = + new PasswordAuthenticator.Builder().username("couchbase").password("b1password").build(); + private final ConnectionString connectionString = ConnectionString.create("couchbase://192.168.99.100"); + private final Core core = Core.create(coreEnvironment, passwordAuthenticator, connectionString); + private final CoreContext coreContext = new CoreContext(core, 1, coreEnvironment, passwordAuthenticator); + + @BeforeEach + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestPutCouchbaseKey", "debug"); Review Comment: ```suggestion ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java: ########## @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import com.couchbase.client.core.Core; +import com.couchbase.client.core.CoreContext; +import com.couchbase.client.core.cnc.tracing.NoopRequestSpan; +import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.core.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.core.env.CoreEnvironment; +import com.couchbase.client.core.env.PasswordAuthenticator; +import com.couchbase.client.core.error.AmbiguousTimeoutException; +import com.couchbase.client.core.error.AuthenticationFailureException; +import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.core.error.DurabilityImpossibleException; +import com.couchbase.client.core.error.InvalidRequestException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.context.CancellationErrorContext; +import com.couchbase.client.core.error.context.GenericRequestErrorContext; +import com.couchbase.client.core.error.context.KeyValueErrorContext; +import com.couchbase.client.core.io.CollectionIdentifier; +import com.couchbase.client.core.io.netty.kv.MemcacheProtocol; +import com.couchbase.client.core.msg.ResponseStatus; +import com.couchbase.client.core.msg.kv.GetRequest; +import com.couchbase.client.core.retry.BestEffortRetryStrategy; +import com.couchbase.client.core.util.ConnectionString; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.couchbase.CouchbaseConfigurationProperties; +import org.apache.nifi.couchbase.DocumentType; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.couchbase.CouchbaseAttributes.Exception; +import static org.apache.nifi.processors.couchbase.GetCouchbaseKey.PUT_VALUE_TO_ATTRIBUTE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestGetCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + + private final CoreEnvironment coreEnvironment = CoreEnvironment.builder().build(); + private final PasswordAuthenticator passwordAuthenticator = + new PasswordAuthenticator.Builder().username("couchbase").password("b1password").build(); + private final ConnectionString connectionString = ConnectionString.create("couchbase://192.168.99.100"); + private final Core core = Core.create(coreEnvironment, passwordAuthenticator, connectionString); + private final CoreContext coreContext = new CoreContext(core, 1, coreEnvironment, passwordAuthenticator); + + private TestRunner testRunner; + + @BeforeEach + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.GetCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestGetCouchbaseKey", "debug"); Review Comment: ```suggestion ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html: ########## @@ -0,0 +1,35 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>CouchbaseMapCacheClient</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>CouchbaseMapCacheClient</h2> + +<h3>Requirements</h3> + +<h4>Couchbase Server 4.0 or higher is required for some operation using N1QL</h4> + +Following cache operations require N1QL query, thus you need to deploy Couchbase Server 4.0 or higher for those operations. However, as of this writing (May 2017) there are only few processors using these operations. Most cache APIs are implemented using document id lookup and should work with older version of Couchbase Server. Review Comment: This description should be updated to reflect more recent behavior. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import com.couchbase.client.core.error.CasMismatchException; +import com.couchbase.client.core.error.DocumentExistsException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.InsertOptions; +import com.couchbase.client.java.kv.ReplaceOptions; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COLLECTION_NAME; +import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; + +@Tags({"distributed", "cache", "map", "cluster", "couchbase"}) +@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." + + " This can be used in order to share a Map between nodes in a NiFi cluster." + + " Couchbase Server cluster can provide a high available and persistent cache storage.") +public class CouchbaseMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + private Collection collection; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(COUCHBASE_CLUSTER_SERVICE); + descriptors.add(BUCKET_NAME); + descriptors.add(COLLECTION_NAME); + return descriptors; Review Comment: This list should be defined statically and returned instead of built in every method invocation. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.couchbase.client.core.error.CouchbaseException; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", + description = "Specify bucket password if necessary." + + " Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.") +@DeprecationNotice(reason = "This component is deprecated and will be removed in NiFi 2.x.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder() + .name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_NAME = new PropertyDescriptor + .Builder() + .name("user-name") + .displayName("User Name") + .description("The user name to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor + .Builder() + .name("user-password") + .displayName("User Password") + .description("The user password to authenticate NiFi as a Couchbase client." + + " This configuration can be used against Couchbase Server 5.0 or later" + + " supporting Roll-Based Access Control.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(CONNECTION_STRING); + props.add(USER_NAME); + props.add(USER_PASSWORD); + + properties = Collections.unmodifiableList(props); + } Review Comment: This can be condensed using `List.of()` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class TestCouchbaseClusterService { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + public static class SampleProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } + } + + @BeforeEach + public void init() { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug"); Review Comment: This settings should be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org