This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7474ad04b62 NIFI-15145 Add RecordLookup, KeyValueLookup, and
MapCacheClient Services for Couchbase (#10467)
7474ad04b62 is described below
commit 7474ad04b62b3f0674f8bacaa2828ade71b66a00
Author: Mark Bathori <[email protected]>
AuthorDate: Wed Mar 4 21:02:28 2026 +0100
NIFI-15145 Add RecordLookup, KeyValueLookup, and MapCacheClient Services
for Couchbase (#10467)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-couchbase-nar/pom.xml | 5 +
.../couchbase/AbstractCouchbaseProcessor.java | 22 +--
.../nifi/processors/couchbase/PutCouchbase.java | 2 -
.../couchbase/AbstractCouchbaseProcessorTest.java | 41 ++++
.../processors/couchbase/GetCouchbaseTest.java | 71 +++----
.../processors/couchbase/PutCouchbaseTest.java | 48 ++---
.../nifi/services/couchbase/CouchbaseClient.java | 11 ++
...ion.java => CouchbaseCasMismatchException.java} | 10 +-
...ption.java => CouchbaseDocExistsException.java} | 10 +-
...ion.java => CouchbaseDocNotFoundException.java} | 10 +-
.../couchbase/exception/CouchbaseException.java | 2 +-
.../CouchbaseLookupInResult.java} | 12 +-
.../pom.xml | 43 ++++-
.../couchbase/AbstractCouchbaseService.java | 93 +++++++++
.../couchbase/CouchbaseKeyValueLookupService.java | 85 +++++++++
.../couchbase/CouchbaseMapCacheClient.java | 209 +++++++++++++++++++++
.../couchbase/CouchbaseRecordLookupService.java | 103 ++++++++++
.../org.apache.nifi.controller.ControllerService | 17 ++
.../couchbase/AbstractCouchbaseServiceTest.java} | 23 ++-
.../CouchbaseKeyValueLookupServiceTest.java | 98 ++++++++++
.../couchbase/CouchbaseMapCacheClientTest.java | 116 ++++++++++++
.../CouchbaseRecordLookupServiceTest.java | 112 +++++++++++
.../couchbase/StandardCouchbaseClient.java | 139 ++++++++++++--
.../StandardCouchbaseConnectionService.java | 10 +-
...uchbaseClient.java => CouchbaseClientTest.java} | 84 ++++++++-
.../nifi-couchbase-bundle/pom.xml | 1 +
26 files changed, 1220 insertions(+), 157 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
index 31482f7dadb..08cef39e101 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
@@ -32,6 +32,11 @@
<artifactId>nifi-couchbase-processors</artifactId>
<version>2.9.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-couchbase-services</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-services-api-nar</artifactId>
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index 6d84e4e953e..ec9361e6d7a 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -50,11 +50,9 @@ import static
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.SCO
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
- protected CouchbaseConnectionService connectionService;
-
public static final PropertyDescriptor DOCUMENT_ID = new
PropertyDescriptor.Builder()
.name("Document ID")
- .description("Couchbase document identifier, or an expression to
construct the Couchbase document identifier.")
+ .description("Couchbase document identifier, or an expression to
construct the Couchbase document identifier")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -62,14 +60,14 @@ public abstract class AbstractCouchbaseProcessor extends
AbstractProcessor {
public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new
PropertyDescriptor.Builder()
.name("Couchbase Connection Service")
- .description("A Couchbase Connection Service which manages
connections to a Couchbase cluster.")
+ .description("Service responsible for managing connections to
Couchbase cluster")
.required(true)
.identifiesControllerService(CouchbaseConnectionService.class)
.build();
public static final PropertyDescriptor BUCKET_NAME = new
PropertyDescriptor.Builder()
.name("Bucket Name")
- .description("The name of the bucket where documents will be
stored. Each bucket contains a hierarchy of scopes and collections to group
keys and values logically.")
+ .description("The name of the bucket where documents will be
stored. Each bucket contains a hierarchy of scopes and collections to group
keys and values logically")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(DEFAULT_BUCKET)
@@ -78,7 +76,7 @@ public abstract class AbstractCouchbaseProcessor extends
AbstractProcessor {
public static final PropertyDescriptor SCOPE_NAME = new
PropertyDescriptor.Builder()
.name("Scope Name")
- .description("The name of the scope which is a logical namespace
within a bucket, serving to categorize and organize related collections.")
+ .description("The name of the scope which is a logical namespace
within a bucket, serving to categorize and organize related collections")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(DEFAULT_SCOPE)
@@ -87,7 +85,7 @@ public abstract class AbstractCouchbaseProcessor extends
AbstractProcessor {
public static final PropertyDescriptor COLLECTION_NAME = new
PropertyDescriptor.Builder()
.name("Collection Name")
- .description("The name of collection which is a logical container
within a scope, used to hold documents.")
+ .description("The name of collection which is a logical container
within a scope, used to hold documents")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(DEFAULT_COLLECTION)
@@ -96,7 +94,7 @@ public abstract class AbstractCouchbaseProcessor extends
AbstractProcessor {
public static final PropertyDescriptor DOCUMENT_TYPE = new
PropertyDescriptor.Builder()
.name("Document Type")
- .description("The content type for storing the document.")
+ .description("The content type for storing the document")
.required(true)
.allowableValues(DocumentType.values())
.defaultValue(DocumentType.JSON.toString())
@@ -104,17 +102,17 @@ public abstract class AbstractCouchbaseProcessor extends
AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
- .description("A FlowFile is routed to this relationship after the
data ingestion was successful.")
+ .description("A FlowFile is routed to this relationship after the
data ingestion was successful")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
- .description("A FlowFile is routed to this relationship if the
operation failed and retrying the operation will also fail, such as an invalid
data or schema.")
+ .description("A FlowFile is routed to this relationship if the
operation failed and retrying the operation will also fail, such as an invalid
data or schema")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
- .description("All FlowFile that fail due to server/cluster
availability go to this relationship.")
+ .description("All FlowFile that fail due to server/cluster
availability go to this relationship")
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
@@ -128,6 +126,8 @@ public abstract class AbstractCouchbaseProcessor extends
AbstractProcessor {
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS,
REL_FAILURE, REL_RETRY);
+ protected volatile CouchbaseConnectionService connectionService;
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
index a2bc22816dd..1ffe1045f1c 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
@@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.services.couchbase.CouchbaseClient;
-import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
@@ -69,7 +68,6 @@ public class PutCouchbase extends AbstractCouchbaseProcessor {
}
final long startNanos = System.nanoTime();
- final CouchbaseConnectionService connectionService =
context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
final String documentId =
context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
final CouchbaseContext couchbaseContext = getCouchbaseContext(context,
flowFile);
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
new file mode 100644
index 00000000000..5cbf0243921
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCouchbaseProcessorTest {
+
+ protected static final String SERVICE_ID = "couchbaseConnectionService";
+ protected static final String TEST_DOCUMENT_ID = "test-document-id";
+ protected static final String TEST_DOCUMENT_CONTENT =
"{\"key\":\"value\"}";
+ protected static final String TEST_SERVICE_LOCATION =
"couchbase://test-location";
+ protected static final long TEST_CAS = 1L;
+
+ protected static CouchbaseConnectionService
mockConnectionService(CouchbaseClient client) {
+ final CouchbaseConnectionService connectionService =
mock(CouchbaseConnectionService.class);
+ when(connectionService.getIdentifier()).thenReturn(SERVICE_ID);
+ when(connectionService.getClient(any())).thenReturn(client);
+
when(connectionService.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ return connectionService;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
index 97595b7d1d4..4f535e34f9f 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
@@ -31,8 +31,8 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,12 +60,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class GetCouchbaseTest {
-
- private static final String SERVICE_ID = "couchbaseConnectionService";
- private static final String TEST_DOCUMENT_ID = "test-document-id";
- private static final String TEST_SERVICE_LOCATION =
"couchbase://test-location";
- private static final long TEST_CAS = 1L;
+public class GetCouchbaseTest extends AbstractCouchbaseProcessorTest {
private TestRunner runner;
@@ -76,20 +71,15 @@ public class GetCouchbaseTest {
@Test
public void testOnTriggerWithProvidedDocumentId() throws
CouchbaseException, InitializationException {
- final String content = "{\"key\":\"value\"}";
-
- final CouchbaseGetResult result = new
CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
- when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -101,7 +91,7 @@ public class GetCouchbaseTest {
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile =
runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE,
DEFAULT_COLLECTION);
@@ -116,24 +106,18 @@ public class GetCouchbaseTest {
@Test
public void testWithDocumentIdAsFlowFileAttribute() throws
CouchbaseException, InitializationException {
- final String content = "{\"key\":\"value\"}";
-
- final CouchbaseGetResult result = new
CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
final MockFlowFile flowFile = new MockFlowFile(0);
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
- flowFile.putAttributes(attributes);
+
flowFile.putAttributes(Collections.singletonMap("flowfile_document_id",
TEST_DOCUMENT_ID));
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(flowFile);
@@ -145,7 +129,7 @@ public class GetCouchbaseTest {
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile =
runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE,
DEFAULT_COLLECTION);
@@ -154,24 +138,20 @@ public class GetCouchbaseTest {
@Test
public void testWithFlowFileAttributes() throws CouchbaseException,
InitializationException {
- final String documentId = "test-document-id";
- final String content = "{\"key\":\"value\"}";
final String testBucket = "test-bucket";
final String testScope = "test-scope";
final String testCollection = "test-collection";
- final CouchbaseGetResult result = new
CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
- runner.setProperty(DOCUMENT_ID, documentId);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
+ runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(BUCKET_NAME, "${bucket.attribute}");
runner.setProperty(SCOPE_NAME, "${scope.attribute}");
runner.setProperty(COLLECTION_NAME, "${collection.attribute}");
@@ -181,17 +161,16 @@ public class GetCouchbaseTest {
attributes.put("bucket.attribute", testBucket);
attributes.put("scope.attribute", testScope);
attributes.put("collection.attribute", testCollection);
- final byte[] input = documentId.getBytes(StandardCharsets.UTF_8);
- runner.enqueue(input, attributes);
+ runner.enqueue(new byte[0], attributes);
runner.run();
- verify(client, times(1)).getDocument(eq(documentId));
+ verify(client, times(1)).getDocument(eq(TEST_DOCUMENT_ID));
runner.assertTransferCount(REL_SUCCESS, 1);
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile =
runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, testBucket);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, testScope);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, testCollection);
@@ -204,13 +183,11 @@ public class GetCouchbaseTest {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.getDocument(anyString())).thenThrow(new
CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
runner.run();
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
index 60cd3a114a2..320beb848eb 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
@@ -57,12 +57,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class PutCouchbaseTest {
-
- private static final String SERVICE_ID = "couchbaseConnectionService";
- private static final String TEST_DOCUMENT_ID = "test-document-id";
- private static final String TEST_SERVICE_LOCATION =
"couchbase://test-location";
- private static final long TEST_CAS = 1L;
+public class PutCouchbaseTest extends AbstractCouchbaseProcessorTest {
private TestRunner runner;
@@ -76,18 +71,15 @@ public class PutCouchbaseTest {
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.upsertDocument(anyString(), any())).thenReturn(new
CouchbaseUpsertResult(TEST_CAS));
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
- when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
final MockFlowFile flowFile = new MockFlowFile(0);
final Map<String, String> attributes = new HashMap<>();
attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
flowFile.putAttributes(attributes);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.setValidateExpressionUsage(false);
@@ -120,12 +112,10 @@ public class PutCouchbaseTest {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.upsertDocument(anyString(), any())).thenThrow(new
CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -143,12 +133,10 @@ public class PutCouchbaseTest {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.RETRY);
when(client.upsertDocument(anyString(), any())).thenThrow(new
CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -166,12 +154,10 @@ public class PutCouchbaseTest {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.ROLLBACK);
when(client.upsertDocument(anyString(), any())).thenThrow(new
CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -189,12 +175,10 @@ public class PutCouchbaseTest {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.upsertDocument(anyString(), any())).thenThrow(new
CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service =
mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
index 9ad04fbe01d..96797c1a5d2 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
@@ -19,6 +19,7 @@ package org.apache.nifi.services.couchbase;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
public interface CouchbaseClient {
@@ -27,5 +28,15 @@ public interface CouchbaseClient {
CouchbaseUpsertResult upsertDocument(String documentId, byte[] content)
throws CouchbaseException;
+ boolean documentExists(String documentId) throws CouchbaseException;
+
+ void insertDocument(String documentId, byte[] content) throws
CouchbaseException;
+
+ void removeDocument(String documentId) throws CouchbaseException;
+
+ void replaceDocument(String documentId, byte[] content, long cas) throws
CouchbaseException;
+
+ CouchbaseLookupInResult lookupIn(String documentId, String subDocPath)
throws CouchbaseException;
+
ExceptionCategory getExceptionCategory(Throwable throwable);
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
similarity index 78%
copy from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
index ea2436e621f..6153f184c39 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
@@ -16,13 +16,9 @@
*/
package org.apache.nifi.services.couchbase.exception;
-public class CouchbaseException extends Exception {
+public class CouchbaseCasMismatchException extends CouchbaseException {
- public CouchbaseException(final String message) {
- super(message);
- }
-
- public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
+ public CouchbaseCasMismatchException(final String message, final Throwable
cause) {
+ super(message, cause);
}
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
similarity index 78%
copy from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
index ea2436e621f..044507e4ac5 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
@@ -16,13 +16,9 @@
*/
package org.apache.nifi.services.couchbase.exception;
-public class CouchbaseException extends Exception {
+public class CouchbaseDocExistsException extends CouchbaseException {
- public CouchbaseException(final String message) {
- super(message);
- }
-
- public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
+ public CouchbaseDocExistsException(final String message, final Throwable
cause) {
+ super(message, cause);
}
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
similarity index 78%
copy from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
index ea2436e621f..cb8090b448e 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
@@ -16,13 +16,9 @@
*/
package org.apache.nifi.services.couchbase.exception;
-public class CouchbaseException extends Exception {
+public class CouchbaseDocNotFoundException extends CouchbaseException {
- public CouchbaseException(final String message) {
- super(message);
- }
-
- public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
+ public CouchbaseDocNotFoundException(final String message, final Throwable
cause) {
+ super(message, cause);
}
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
index ea2436e621f..600e38dc1c9 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
@@ -23,6 +23,6 @@ public class CouchbaseException extends Exception {
}
public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
+ super(message, cause);
}
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
similarity index 73%
copy from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
copy to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
index ea2436e621f..c916e29f343 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
@@ -14,15 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.services.couchbase.exception;
+package org.apache.nifi.services.couchbase.utils;
-public class CouchbaseException extends Exception {
-
- public CouchbaseException(final String message) {
- super(message);
- }
-
- public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
- }
+public record CouchbaseLookupInResult(Object resultContent, long cas) {
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
similarity index 52%
copy from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
copy to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
index 31482f7dadb..0136ce0bdb2 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
@@ -23,21 +23,52 @@
<version>2.9.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-couchbase-nar</artifactId>
- <packaging>nar</packaging>
+ <artifactId>nifi-couchbase-services</artifactId>
+ <packaging>jar</packaging>
<dependencies>
+ <!-- Internal dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-couchbase-processors</artifactId>
+ <artifactId>nifi-couchbase-services-api</artifactId>
<version>2.9.0-SNAPSHOT</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-couchbase-services-api-nar</artifactId>
+ <artifactId>nifi-lookup-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-record-utils</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
<version>2.9.0-SNAPSHOT</version>
- <type>nar</type>
+ <scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
new file mode 100644
index 00000000000..9c9d66348f7
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+
+import java.util.Set;
+
+public abstract class AbstractCouchbaseService extends
AbstractControllerService {
+
+ protected static final String KEY = "key";
+ protected static final Set<String> REQUIRED_KEYS = Set.of(KEY);
+
+ private static final String DEFAULT_BUCKET = "default";
+ private static final String DEFAULT_SCOPE = "_default";
+ private static final String DEFAULT_COLLECTION = "_default";
+
+ public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Couchbase Connection Service")
+ .description("Service responsible for managing connections to
Couchbase cluster")
+ .required(true)
+ .identifiesControllerService(CouchbaseConnectionService.class)
+ .build();
+
+ public static final PropertyDescriptor BUCKET_NAME = new
PropertyDescriptor.Builder()
+ .name("Bucket Name")
+ .description("The name of the bucket where documents will be
stored. Each bucket contains a hierarchy of scopes and collections to group
keys and values logically")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_BUCKET)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor SCOPE_NAME = new
PropertyDescriptor.Builder()
+ .name("Scope Name")
+ .description("The name of the scope which is a logical namespace
within a bucket, serving to categorize and organize related collections")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_SCOPE)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor COLLECTION_NAME = new
PropertyDescriptor.Builder()
+ .name("Collection Name")
+ .description("The name of collection which is a logical container
within a scope, used to hold documents")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_COLLECTION)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ protected volatile CouchbaseClient couchbaseClient;
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final CouchbaseConnectionService connectionService =
context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
+ final CouchbaseContext couchbaseContext = getCouchbaseContext(context);
+ couchbaseClient = connectionService.getClient(couchbaseContext);
+ }
+
+ public Set<String> getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+
+ private CouchbaseContext getCouchbaseContext(final ConfigurationContext
context) {
+ final String bucketName =
context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue();
+ final String scopeName =
context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue();
+ final String collectionName =
context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();
+
+ return new CouchbaseContext(bucketName, scopeName, collectionName,
DocumentType.BINARY);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
new file mode 100644
index 00000000000..8a35b0ea2d4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Tags({"lookup", "enrich", "key", "value", "couchbase"})
+@CapabilityDescription("Lookup a string value from Couchbase Server associated
with the specified key. The coordinates that are passed to the lookup must
contain the key 'key'.")
+public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService
implements StringLookupService {
+
+ public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new
PropertyDescriptor.Builder()
+ .name("Lookup Sub-Document Path")
+ .description("The Sub-Document lookup path within the target JSON
document")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME,
+ LOOKUP_SUB_DOC_PATH
+ );
+
+ private volatile String subDocPath;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ super.onEnabled(context);
+ subDocPath =
context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ public Optional<String> lookup(final Map<String, Object> coordinates)
throws LookupFailureException {
+ final Object documentId = coordinates.get(KEY);
+
+ if (documentId == null) {
+ return Optional.empty();
+ }
+
+ try {
+ final CouchbaseLookupInResult result =
couchbaseClient.lookupIn(documentId.toString(), subDocPath);
+ return
Optional.ofNullable(result.resultContent()).map(Object::toString);
+ } catch (final CouchbaseDocNotFoundException e) {
+ return Optional.empty();
+ } catch (final Exception e) {
+ throw new LookupFailureException("Key-value lookup for Document ID
[%s] failed".formatted(documentId), e);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
new file mode 100644
index 00000000000..9ff3354f0af
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+@Tags({"distributed", "cache", "map", "cluster", "couchbase"})
+@CapabilityDescription("""
+ Provides the ability to communicate with a Couchbase Server cluster as
a DistributedMapCacheServer.
+ This can be used in order to share a Map between nodes in a NiFi
cluster.
+ Couchbase Server cluster can provide a high available and persistent
cache storage.""")
+public class CouchbaseMapCacheClient extends AbstractCouchbaseService
implements AtomicDistributedMapCacheClient<Long> {
+
+ private static final List<PropertyDescriptor> PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME
+ );
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws
IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ try {
+ final CouchbaseGetResult result =
couchbaseClient.getDocument(documentId);
+ return new AtomicCacheEntry<>(key,
deserializeDocument(valueDeserializer, result.resultContent()), result.cas());
+ } catch (final CouchbaseDocNotFoundException e) {
+ return null;
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to fetch cache entry with Document
ID [%s] from Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
+ final String documentId = serializeDocumentKey(entry.getKey(),
keySerializer);
+ final byte[] document = serializeDocument(entry.getValue(),
valueSerializer);
+ final Optional<Long> revision = entry.getRevision();
+
+ if (revision.isEmpty()) {
+ try {
+ couchbaseClient.insertDocument(documentId, document);
+ return true;
+ } catch (final CouchbaseDocExistsException e) {
+ return false;
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to insert cache entry with
Document ID [%s] into Couchbase".formatted(documentId), e);
+ }
+ }
+
+ try {
+ final long casValue = revision.get();
+ couchbaseClient.replaceDocument(documentId, document, casValue);
+ return true;
+ } catch (final CouchbaseDocNotFoundException |
CouchbaseCasMismatchException e) {
+ return false;
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to replace cache entry with Document
ID [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ final byte[] document = serializeDocument(value, valueSerializer);
+
+ try {
+ couchbaseClient.insertDocument(documentId, document);
+ return true;
+ } catch (final CouchbaseDocExistsException e) {
+ return false;
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to insert cache entry with Document
ID [%s] into Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final
Deserializer<V> valueDeserializer) throws IOException {
+ final V document = get(key, keySerializer, valueDeserializer);
+ if (document != null) {
+ return document;
+ }
+
+ boolean putResult = putIfAbsent(key, value, keySerializer,
valueSerializer);
+ if (!putResult) {
+ return getAndPutIfAbsent(key, value, keySerializer,
valueSerializer, valueDeserializer);
+ }
+ return null;
+ }
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K>
keySerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+
+ try {
+ return couchbaseClient.documentExists(documentId);
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to check existence of cache entry
with Document ID [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public <K, V> void put(final K key, final V value, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ final byte[] document = serializeDocument(value, valueSerializer);
+
+ try {
+ couchbaseClient.upsertDocument(documentId, document);
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to insert cache entry with Document
ID [%s] into Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public <K, V> V get(final K key, final Serializer<K> keySerializer, final
Deserializer<V> valueDeserializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+
+ try {
+ final CouchbaseGetResult result =
couchbaseClient.getDocument(documentId);
+ return deserializeDocument(valueDeserializer,
result.resultContent());
+ } catch (final CouchbaseDocNotFoundException e) {
+ return null;
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to fetch cache entry with Document
ID [%s] from Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public <K> boolean remove(final K key, final Serializer<K> serializer)
throws IOException {
+ final String documentId = serializeDocumentKey(key, serializer);
+
+ try {
+ couchbaseClient.removeDocument(documentId);
+ return true;
+ } catch (final CouchbaseDocNotFoundException e) {
+ return false;
+ } catch (final CouchbaseException e) {
+ throw new IOException("Failed to remove cache entry with Document
ID [%s] from Couchbase".formatted(documentId), e);
+ }
+ }
+
+ private <S> String serializeDocumentKey(final S key, final Serializer<S>
serializer) throws IOException {
+ final String result;
+
+ if (key instanceof String) {
+ result = (String) key;
+ } else {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ serializer.serialize(key, stream);
+ result = stream.toString(StandardCharsets.UTF_8);
+ }
+
+ if (result.isEmpty()) {
+ throw new IOException("Cache entry key cannot be empty!");
+ }
+
+ return result;
+ }
+
+ private <S> byte[] serializeDocument(final S value, final Serializer<S>
serializer) throws IOException {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ serializer.serialize(value, stream);
+ return stream.toByteArray();
+ }
+
+ private static <V> V deserializeDocument(final Deserializer<V>
deserializer, final byte[] value) throws IOException {
+ return deserializer.deserialize(value);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
new file mode 100644
index 00000000000..4dcf3b40f5e
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Tags({"lookup", "enrich", "couchbase"})
+@CapabilityDescription("Lookup a record from Couchbase Server associated with
the specified key. The coordinates that are passed to the lookup must contain
the key 'key'.")
+public class CouchbaseRecordLookupService extends AbstractCouchbaseService
implements RecordLookupService {
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .description("The Record Reader to use for parsing fetched
document from Couchbase Server")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME,
+ RECORD_READER
+ );
+
+ private volatile RecordReaderFactory readerFactory;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ super.onEnabled(context);
+ readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ }
+
+ @Override
+ public Optional<Record> lookup(final Map<String, Object> coordinates)
throws LookupFailureException {
+ final Object documentId = coordinates.get(KEY);
+
+ if (documentId == null) {
+ return Optional.empty();
+ }
+
+ CouchbaseGetResult result;
+ try {
+ result = couchbaseClient.getDocument(documentId.toString());
+ } catch (final CouchbaseDocNotFoundException e) {
+ return Optional.empty();
+ } catch (final Exception e) {
+ throw new LookupFailureException("Failed to look up record with
Document ID [%s] in Couchbase.".formatted(documentId), e);
+ }
+
+ try (final InputStream input = new
ByteArrayInputStream(result.resultContent())) {
+ final long inputLength = result.resultContent().length;
+ final Map<String, String> stringMap =
coordinates.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> String.valueOf(e.getValue())
+ ));
+
+ final RecordReader recordReader =
readerFactory.createRecordReader(stringMap, input, inputLength, getLogger());
+ return Optional.ofNullable(recordReader.nextRecord());
+ } catch (final Exception e) {
+ throw new LookupFailureException("Failed to parse the looked-up
record with Document ID [%s]".formatted(documentId), e);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 00000000000..772920f46fe
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.couchbase.CouchbaseKeyValueLookupService
+org.apache.nifi.services.couchbase.CouchbaseRecordLookupService
+org.apache.nifi.services.couchbase.CouchbaseMapCacheClient
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
similarity index 50%
copy from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
copy to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
index 9ad04fbe01d..5a631c176a3 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
@@ -16,16 +16,21 @@
*/
package org.apache.nifi.services.couchbase;
-import org.apache.nifi.services.couchbase.exception.CouchbaseException;
-import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
-import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
-import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-public interface CouchbaseClient {
+abstract class AbstractCouchbaseServiceTest {
- CouchbaseGetResult getDocument(String documentId) throws
CouchbaseException;
+ protected static final String CONNECTION_SERVICE_ID =
"couchbaseConnectionService";
+ protected static final String TEST_DOCUMENT_ID = "test-document-id";
+ protected static final String TEST_DOCUMENT_CONTENT =
"{\"key\":\"value\"}";
+ protected static final long TEST_CAS = 1L;
- CouchbaseUpsertResult upsertDocument(String documentId, byte[] content)
throws CouchbaseException;
-
- ExceptionCategory getExceptionCategory(Throwable throwable);
+ protected static CouchbaseConnectionService
mockConnectionService(CouchbaseClient client) {
+ final CouchbaseConnectionService connectionService =
mock(CouchbaseConnectionService.class);
+
when(connectionService.getIdentifier()).thenReturn(CONNECTION_SERVICE_ID);
+ when(connectionService.getClient(any())).thenReturn(client);
+ return connectionService;
+ }
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
new file mode 100644
index 00000000000..9ccdb5ef8de
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.lookup.LookupFailureException;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CouchbaseKeyValueLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+ private CouchbaseKeyValueLookupService lookupService;
+ private CouchbaseClient client;
+
+ @BeforeEach
+ void init() {
+ lookupService = new CouchbaseKeyValueLookupService();
+ client = mock(CouchbaseClient.class);
+
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
+ final MockControllerServiceInitializationContext
serviceInitializationContext = new
MockControllerServiceInitializationContext(connectionService,
CONNECTION_SERVICE_ID);
+ final Map<PropertyDescriptor, String> properties =
Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new
MockConfigurationContext(properties, serviceInitializationContext, new
HashMap<>());
+
+ lookupService.onEnabled(context);
+ }
+
+ @Test
+ void testSuccessfulLookup() throws CouchbaseException,
LookupFailureException {
+ when(client.lookupIn(anyString(), any())).thenReturn(new
CouchbaseLookupInResult("test result", TEST_CAS));
+
+ final Map<String, Object> coordinates = Collections.singletonMap(KEY,
TEST_DOCUMENT_ID);
+ final Optional<String> result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isPresent());
+ assertEquals("test result", result.get());
+ }
+
+ @Test
+ void testLookupFailure() throws CouchbaseException {
+ when(client.lookupIn(anyString(), any())).thenThrow(new
CouchbaseException("Test exception"));
+
+ final Map<String, Object> coordinates = Collections.singletonMap(KEY,
TEST_DOCUMENT_ID);
+
+ assertThrows(LookupFailureException.class, () ->
lookupService.lookup(coordinates));
+ }
+
+ @Test
+ void testDocumentNotFoundInLookup() throws CouchbaseException,
LookupFailureException {
+ when(client.lookupIn(anyString(), any())).thenThrow(new
CouchbaseDocNotFoundException("Test doc not found exception", null));
+
+ final Map<String, Object> coordinates = Collections.singletonMap(KEY,
TEST_DOCUMENT_ID);
+ final Optional<String> result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testMissingKey() throws LookupFailureException {
+ final Optional<String> result =
lookupService.lookup(Collections.emptyMap());
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
new file mode 100644
index 00000000000..45475cfcf25
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class CouchbaseMapCacheClientTest extends AbstractCouchbaseServiceTest {
+
+ private final Serializer<String> stringSerializer = (value, output) ->
output.write(value.getBytes(StandardCharsets.UTF_8));
+ private final Deserializer<String> stringDeserializer = input -> new
String(input, StandardCharsets.UTF_8);
+ private CouchbaseMapCacheClient mapCacheClient;
+ private CouchbaseClient client;
+
+ @BeforeEach
+ void init() {
+ mapCacheClient = new CouchbaseMapCacheClient();
+ client = mock(CouchbaseClient.class);
+
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
+ final MockControllerServiceInitializationContext
serviceInitializationContext = new
MockControllerServiceInitializationContext(connectionService,
CONNECTION_SERVICE_ID);
+ final Map<PropertyDescriptor, String> properties =
Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new
MockConfigurationContext(properties, serviceInitializationContext, new
HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+ }
+
+ @Test
+ void testCacheGet() throws CouchbaseException, IOException {
+ when(client.getDocument(anyString())).thenReturn(new
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+ final String result = mapCacheClient.get(TEST_DOCUMENT_ID,
stringSerializer, stringDeserializer);
+
+ assertEquals(TEST_DOCUMENT_CONTENT, result);
+ }
+
+ @Test
+ void testCacheGetFailure() throws CouchbaseException {
+ when(client.getDocument(anyString())).thenThrow(new
CouchbaseException("Test exception", null));
+
+ assertThrows(IOException.class, () ->
mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer));
+ }
+
+ @Test
+ void testCacheGetNotFound() throws CouchbaseException, IOException {
+ when(client.getDocument(anyString())).thenThrow(new
CouchbaseDocNotFoundException("Test doc not found exception", null));
+
+ final String result = mapCacheClient.get(TEST_DOCUMENT_ID,
stringSerializer, stringDeserializer);
+
+ assertNull(result);
+ }
+
+ @Test
+ void testCachePut() throws CouchbaseException, IOException {
+ when(client.upsertDocument(anyString(), any())).thenReturn(new
CouchbaseUpsertResult(TEST_CAS));
+
+ mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT,
stringSerializer, stringSerializer);
+
+ verify(client, times(1)).upsertDocument(eq(TEST_DOCUMENT_ID),
eq(TEST_DOCUMENT_CONTENT.getBytes()));
+ }
+
+ @Test
+ void testCachePutFailure() throws CouchbaseException {
+ when(client.upsertDocument(anyString(), any())).thenThrow(new
CouchbaseException("Test exception"));
+
+ assertThrows(IOException.class, () ->
mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer,
stringSerializer));
+ }
+
+ @Test
+ void testCacheRemove() throws CouchbaseException, IOException {
+ mapCacheClient.remove(TEST_DOCUMENT_ID, stringSerializer);
+
+ verify(client, times(1)).removeDocument(eq(TEST_DOCUMENT_ID));
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
new file mode 100644
index 00000000000..d55f5cdb697
--- /dev/null
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CouchbaseRecordLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+ private static final String LOOKUP_SERVICE_ID = "lookupService";
+ private static final String RECORD_READER_ID = "recordReaderService";
+
+ private CouchbaseRecordLookupService lookupService;
+ private CouchbaseClient client;
+
+ @BeforeEach
+ void setup() throws InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(NoOpProcessor.class);
+ lookupService = new CouchbaseRecordLookupService();
+ client = mock(CouchbaseClient.class);
+
+ final CouchbaseConnectionService connectionService =
mockConnectionService(client);
+
+ final MockRecordParser readerFactory = new MockRecordParser();
+ readerFactory.addSchemaField("key", RecordFieldType.STRING);
+ readerFactory.addRecord("value");
+
+ runner.addControllerService(CONNECTION_SERVICE_ID, connectionService);
+ runner.addControllerService(RECORD_READER_ID, readerFactory);
+ runner.addControllerService(LOOKUP_SERVICE_ID, lookupService);
+
+ runner.enableControllerService(connectionService);
+ runner.enableControllerService(readerFactory);
+
+ runner.setProperty(lookupService, COUCHBASE_CONNECTION_SERVICE,
CONNECTION_SERVICE_ID);
+ runner.setProperty(lookupService,
CouchbaseRecordLookupService.RECORD_READER, RECORD_READER_ID);
+
+ runner.enableControllerService(lookupService);
+ }
+
+ @Test
+ void testSuccessfulLookup() throws LookupFailureException,
CouchbaseException {
+ when(client.getDocument(anyString())).thenReturn(new
CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+ final Map<String, Object> coordinates = Collections.singletonMap(KEY,
TEST_DOCUMENT_ID);
+ final Optional<Record> result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isPresent());
+
+ final List<RecordField> fields = Collections.singletonList(new
RecordField("key", RecordFieldType.STRING.getDataType()));
+ final Record expectedRecord = new MapRecord(new
SimpleRecordSchema(fields), Collections.singletonMap("key", "value"));
+
+ assertEquals(expectedRecord, result.get());
+ }
+
+ @Test
+ void testLookupFailure() throws CouchbaseException {
+ when(client.getDocument(anyString())).thenThrow(new
CouchbaseException("Test exception"));
+
+ final Map<String, Object> coordinates = Collections.singletonMap(KEY,
TEST_DOCUMENT_ID);
+
+ assertThrows(LookupFailureException.class, () ->
lookupService.lookup(coordinates));
+ }
+
+ @Test
+ void testMissingKey() throws LookupFailureException {
+ final Optional<Record> result =
lookupService.lookup(Collections.emptyMap());
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
index dce98be9196..e5b7e436999 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
@@ -45,20 +45,34 @@ import com.couchbase.client.java.Collection;
import com.couchbase.client.java.codec.RawBinaryTranscoder;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.codec.Transcoder;
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.kv.ExistsResult;
import com.couchbase.client.java.kv.GetOptions;
import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.InsertOptions;
+import com.couchbase.client.java.kv.LookupInResult;
+import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplaceOptions;
import com.couchbase.client.java.kv.ReplicateTo;
import com.couchbase.client.java.kv.UpsertOptions;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException;
+import
org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
import org.apache.nifi.services.couchbase.utils.DocumentType;
import org.apache.nifi.services.couchbase.utils.JsonValidator;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
@@ -104,7 +118,7 @@ class StandardCouchbaseClient implements CouchbaseClient {
entry(ValueTooLargeException.class, FAILURE)
);
- StandardCouchbaseClient(Collection collection, DocumentType documentType,
PersistTo persistTo, ReplicateTo replicateTo) {
+ StandardCouchbaseClient(final Collection collection, final DocumentType
documentType, final PersistTo persistTo, final ReplicateTo replicateTo) {
this.collection = collection;
this.documentType = documentType;
this.persistTo = persistTo;
@@ -112,23 +126,25 @@ class StandardCouchbaseClient implements CouchbaseClient {
}
@Override
- public CouchbaseGetResult getDocument(String documentId) throws
CouchbaseException {
+ public CouchbaseGetResult getDocument(final String documentId) throws
CouchbaseException {
try {
final GetResult result = collection.get(documentId,
GetOptions.getOptions().transcoder(getTranscoder(documentType)));
return new CouchbaseGetResult(result.contentAsBytes(),
result.cas());
- } catch (Exception e) {
+ } catch (final DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with
key [%s] not found".formatted(documentId), e);
+ } catch (final Exception e) {
throw new CouchbaseException("Failed to get document [%s] from
Couchbase".formatted(documentId), e);
}
}
@Override
- public CouchbaseUpsertResult upsertDocument(String documentId, byte[]
content) throws CouchbaseException {
- try {
- if (!getInputValidator(documentType).test(content)) {
- throw new CouchbaseException("The provided input is invalid");
- }
+ public CouchbaseUpsertResult upsertDocument(final String documentId, final
byte[] content) throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for
document [%s]".formatted(documentId));
+ }
+ try {
final MutationResult result = collection.upsert(documentId,
content,
UpsertOptions.upsertOptions()
.durability(persistTo, replicateTo)
@@ -136,27 +152,126 @@ class StandardCouchbaseClient implements CouchbaseClient
{
.clientContext(new HashMap<>()));
return new CouchbaseUpsertResult(result.cas());
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new CouchbaseException("Failed to upsert document [%s] in
Couchbase".formatted(documentId), e);
}
}
@Override
- public ExceptionCategory getExceptionCategory(Throwable throwable) {
+ public boolean documentExists(final String documentId) throws
CouchbaseException {
+ try {
+ final ExistsResult result = collection.exists(documentId);
+ return result.exists();
+ } catch (final Exception e) {
+ throw new CouchbaseException("Failed to check document [%s] in
Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void insertDocument(final String documentId, final byte[] content)
throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for
document [%s]".formatted(documentId));
+ }
+
+ try {
+ collection.insert(documentId, content,
+ InsertOptions.insertOptions()
+ .durability(persistTo, replicateTo)
+ .transcoder(getTranscoder(documentType))
+ .clientContext(new HashMap<>()));
+ } catch (final DocumentExistsException e) {
+ throw new CouchbaseDocExistsException("Document with key [%s]
already exists".formatted(documentId), e);
+ } catch (final Exception e) {
+ throw new CouchbaseException("Failed to insert document [%s] in
Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void removeDocument(final String documentId) throws
CouchbaseException {
+ try {
+ collection.remove(documentId);
+ } catch (final DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with
key [%s] not found".formatted(documentId), e);
+ } catch (final Exception e) {
+ throw new CouchbaseException("Failed to remove document [%s] in
Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void replaceDocument(final String documentId, final byte[] content,
final long cas) throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for
document [%s]".formatted(documentId));
+ }
+
+ try {
+ collection.replace(documentId, content,
+ ReplaceOptions.replaceOptions()
+ .cas(cas)
+ .durability(persistTo, replicateTo)
+ .transcoder(getTranscoder(documentType))
+ .clientContext(new HashMap<>()));
+ } catch (final CasMismatchException e) {
+ throw new CouchbaseCasMismatchException("Couchbase document with
key [%s] has been concurrently modified".formatted(documentId), e);
+ } catch (final DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with
key [%s] not found".formatted(documentId), e);
+ } catch (final Exception e) {
+ throw new CouchbaseException("Failed to replace document [%s] in
Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public CouchbaseLookupInResult lookupIn(final String documentId, final
String subDocPath) throws CouchbaseException {
+ try {
+ final String documentPath = subDocPath == null ? "" : subDocPath;
+ final LookupInResult result = collection.lookupIn(documentId,
Collections.singletonList(LookupInSpec.get(documentPath)));
+
+ if (!result.exists(0)) {
+ throw new CouchbaseException("No value found on the requested
path [%s] in Couchbase".formatted(subDocPath));
+ }
+
+ Object lookupInResult;
+ try {
+ lookupInResult = result.contentAs(0, Object.class);
+ } catch (final DecodingFailureException e) {
+ lookupInResult = result.contentAs(0, byte[].class);
+ }
+
+ return new
CouchbaseLookupInResult(deserializeLookupInResult(lookupInResult),
result.cas());
+ } catch (final DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with
key [%s] not found".formatted(documentId), e);
+ } catch (final Exception e) {
+ throw new CouchbaseException("Failed to look up in document [%s]
in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public ExceptionCategory getExceptionCategory(final Throwable throwable) {
return exceptionMapping.getOrDefault(throwable.getClass(), FAILURE);
}
- private Transcoder getTranscoder(DocumentType documentType) {
+ private Transcoder getTranscoder(final DocumentType documentType) {
return switch (documentType) {
case JSON -> RawJsonTranscoder.INSTANCE;
case BINARY -> RawBinaryTranscoder.INSTANCE;
};
}
- private Predicate<byte[]> getInputValidator(DocumentType documentType) {
+ private Predicate<byte[]> getInputValidator(final DocumentType
documentType) {
return switch (documentType) {
case JSON -> new JsonValidator();
case BINARY -> v -> true;
};
}
+
+ private String deserializeLookupInResult(final Object result) {
+ return switch (result) {
+ case null -> null;
+ case String s -> s;
+ case Map map -> JsonObject.from(map).toString();
+ case List list -> JsonArray.from(list).toString();
+ case byte[] bytes -> new String(bytes, StandardCharsets.UTF_8);
+ default -> result.toString();
+ };
+
+ }
}
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
index beb516daeaf..92a31329c40 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseConnectionService.java
@@ -58,7 +58,7 @@ public class StandardCouchbaseConnectionService extends
AbstractControllerServic
public static final PropertyDescriptor USERNAME = new
PropertyDescriptor.Builder()
.name("Username")
- .description("The username to authenticate to the Couchbase
client.")
+ .description("The username to authenticate to the Couchbase
client")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -66,7 +66,7 @@ public class StandardCouchbaseConnectionService extends
AbstractControllerServic
public static final PropertyDescriptor PASSWORD = new
PropertyDescriptor.Builder()
.name("Password")
- .description("The user's password to authenticate to the Couchbase
client.")
+ .description("The user's password to authenticate to the Couchbase
client")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -74,13 +74,13 @@ public class StandardCouchbaseConnectionService extends
AbstractControllerServic
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description("Service supporting SSL communication configuration.
The service is using one-way SSL, so only the trust store properties will be
used.")
+ .description("Service supporting SSL communication configuration.
The service is using one-way SSL, so only the trust store properties will be
used")
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor PERSISTENCE_STRATEGY = new
PropertyDescriptor.Builder()
.name("Persistence Strategy")
- .description("Durability constraint about disk persistence.")
+ .description("Durability constraint about disk persistence")
.required(true)
.allowableValues(PersistTo.values())
.defaultValue(PersistTo.NONE.toString())
@@ -88,7 +88,7 @@ public class StandardCouchbaseConnectionService extends
AbstractControllerServic
public static final PropertyDescriptor REPLICATION_STRATEGY = new
PropertyDescriptor.Builder()
.name("Replication Strategy")
- .description("Durability constraint about replication.")
+ .description("Durability constraint about replication")
.required(true)
.allowableValues(ReplicateTo.values())
.defaultValue(ReplicateTo.NONE.toString())
diff --git
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
similarity index 50%
rename from
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
rename to
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
index ad3d7e7d878..62a9bc064b4 100644
---
a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
+++
b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
@@ -18,16 +18,23 @@ package org.apache.nifi.services.couchbase;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.PersistTo;
import com.couchbase.client.java.kv.ReplicateTo;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.services.couchbase.utils.DocumentType.JSON;
@@ -36,11 +43,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestCouchbaseClient {
+public class CouchbaseClientTest {
private static final String TEST_DOCUMENT_ID = "test-document-id";
private static final long TEST_CAS = 1L;
@@ -72,7 +80,24 @@ public class TestCouchbaseClient {
final StandardCouchbaseClient client = new
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
final Exception exception = assertThrows(CouchbaseException.class, ()
-> client.upsertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+ assertTrue(exception.getMessage().contains("The provided input is
invalid"));
+ }
+
+ @Test
+ void testInsertJsonDocumentValidationFailure() {
+ final String content = "{invalid-json}";
+ final StandardCouchbaseClient client = new
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ final Exception exception = assertThrows(CouchbaseException.class, ()
-> client.insertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+ assertTrue(exception.getMessage().contains("The provided input is
invalid"));
+ }
+
+ @Test
+ void testReplaceJsonDocumentValidationFailure() {
+ final String content = "{invalid-json}";
+ final StandardCouchbaseClient client = new
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+ final Exception exception = assertThrows(CouchbaseException.class, ()
-> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes(), TEST_CAS));
assertTrue(exception.getMessage().contains("The provided input is
invalid"));
}
@@ -94,4 +119,61 @@ public class TestCouchbaseClient {
assertEquals(TEST_CAS, getResult.cas());
assertArrayEquals(content.getBytes(), getResult.resultContent());
}
+
+ @Test
+ void testLookupInWithMapResult() throws CouchbaseException {
+ final String expectedResult = "{\"name\":\"John\",\"age\":\"20\"}";
+ final StandardCouchbaseClient client = new
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ Map<String, String> lookupInContent = new HashMap<>();
+ lookupInContent.put("name", "John");
+ lookupInContent.put("age", "20");
+
+ final LookupInResult result = mock(LookupInResult.class);
+ when(result.contentAs(anyInt(),
any(Class.class))).thenReturn(lookupInContent);
+ when(result.exists(anyInt())).thenReturn(true);
+ when(result.cas()).thenReturn(TEST_CAS);
+
+ when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+ final CouchbaseLookupInResult lookupInResult =
client.lookupIn(TEST_DOCUMENT_ID, "");
+
+ assertEquals(expectedResult, lookupInResult.resultContent());
+ assertEquals(TEST_CAS, lookupInResult.cas());
+ }
+
+ @Test
+ void testLookupInWithArrayResult() throws CouchbaseException {
+ final String expectedResult =
"[{\"name\":\"John\"},{\"name\":\"Jack\"}]";
+ final StandardCouchbaseClient client = new
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ List<Object> lookupInContent = new ArrayList<>();
+ lookupInContent.add(Collections.singletonMap("name", "John"));
+ lookupInContent.add(Collections.singletonMap("name", "Jack"));
+
+ final LookupInResult result = mock(LookupInResult.class);
+ when(result.contentAs(anyInt(),
any(Class.class))).thenReturn(lookupInContent);
+ when(result.exists(anyInt())).thenReturn(true);
+ when(result.cas()).thenReturn(TEST_CAS);
+
+ when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+ final CouchbaseLookupInResult lookupInResult =
client.lookupIn(TEST_DOCUMENT_ID, "");
+
+ assertEquals(expectedResult, lookupInResult.resultContent());
+ assertEquals(TEST_CAS, lookupInResult.cas());
+ }
+
+ @Test
+ void testLookupInWithNoResult() {
+ final StandardCouchbaseClient client = new
StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ final LookupInResult result = mock(LookupInResult.class);
+ when(result.exists(anyInt())).thenReturn(false);
+
+ when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+ final Exception exception = assertThrows(CouchbaseException.class, ()
-> client.lookupIn(TEST_DOCUMENT_ID, "test-path"));
+ assertTrue(exception.getCause().getMessage().contains("No value found
on the requested path [test-path] in Couchbase"));
+ }
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
index f332f74ee15..33af5cee38b 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml
@@ -34,6 +34,7 @@
<module>nifi-couchbase-standard-services-nar</module>
<module>nifi-couchbase-services-api</module>
<module>nifi-couchbase-services-api-nar</module>
+ <module>nifi-couchbase-services</module>
</modules>
</project>
\ No newline at end of file