This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 3625a73 NIFI-7821 Added Cassandra-based DMC. 3625a73 is described below commit 3625a73d91f9504e7117043402613ddbe3a0cbe9 Author: Mike Thomsen <mthom...@apache.org> AuthorDate: Thu Oct 29 15:02:14 2020 -0400 NIFI-7821 Added Cassandra-based DMC. NIFI-7821 Updated configuration documentation. NIFI-7821 Fixed getAndPutIfAbsent and added int test. Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #4635 --- .../pom.xml | 111 ++++++++++ .../cassandra/CassandraDistributedMapCache.java | 241 +++++++++++++++++++++ .../nifi/controller/cassandra/QueryUtils.java | 44 ++++ .../org.apache.nifi.controller.ControllerService | 15 ++ .../nifi/CassandraDistributedMapCacheIT.groovy | 133 ++++++++++++ .../src/test/java/.gitignore | 0 .../nifi-cassandra-services-nar/pom.xml | 5 + nifi-nar-bundles/nifi-cassandra-bundle/pom.xml | 1 + 8 files changed, 550 insertions(+) diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml new file mode 100644 index 0000000..75939d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-bundle</artifactId> + <version>1.13.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-distributedmapcache-service</artifactId> + <version>1.13.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <version>1.13.0-SNAPSHOT</version> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <version>1.13.0-SNAPSHOT</version> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + <version>1.13.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.10.1</version> + </dependency> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-all</artifactId> + <type>pom</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + <version>1.13.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-services-api</artifactId> + <version>1.13.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-services</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java new file mode 100644 index 0000000..a921ec1 --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.controller.cassandra.QueryUtils.createDeleteStatement; +import static org.apache.nifi.controller.cassandra.QueryUtils.createExistsQuery; +import static org.apache.nifi.controller.cassandra.QueryUtils.createFetchQuery; +import static org.apache.nifi.controller.cassandra.QueryUtils.createInsertStatement; + +@Tags({"map", "cache", "distributed", "cassandra"}) +@CapabilityDescription("Provides a DistributedMapCache client that is based on Apache Cassandra.") +public class CassandraDistributedMapCache extends AbstractControllerService implements DistributedMapCacheClient { + public static final PropertyDescriptor SESSION_PROVIDER = new PropertyDescriptor.Builder() + .name("cassandra-dmc-session-provider") + .displayName("Session Provider") + .description("The client service that will configure the cassandra client connection.") + .required(true) + .identifiesControllerService(CassandraSessionProviderService.class) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("cassandra-dmc-table-name") + .displayName("Table Name") + .description("The name of the table where the cache will be stored.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder() + .name("cassandra-dmc-key-field-name") + .displayName("Key Field Name") + .description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder() + .name("cassandra-dmc-value-field-name") + .displayName("Value Field Name") + .description("The name of the field that will store the value. (The CQL type should be \"blob\")") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("cassandra-dmc-ttl") + .displayName("TTL") + .description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that " + + "old cache items expire after a certain period of time.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .required(false) + .build(); + + public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + SESSION_PROVIDER, TABLE_NAME, KEY_FIELD_NAME, VALUE_FIELD_NAME, TTL + )); + + private CassandraSessionProviderService sessionProviderService; + private String tableName; + private String keyField; + private String valueField; + private Long ttl; + + private Session session; + private PreparedStatement deleteStatement; + private PreparedStatement existsStatement; + private PreparedStatement fetchStatement; + private PreparedStatement insertStatement; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnEnabled + public void onEnabled(ConfigurationContext context) { + sessionProviderService = context.getProperty(SESSION_PROVIDER).asControllerService(CassandraSessionProviderService.class); + tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + keyField = context.getProperty(KEY_FIELD_NAME).evaluateAttributeExpressions().getValue(); + valueField = context.getProperty(VALUE_FIELD_NAME).evaluateAttributeExpressions().getValue(); + if (context.getProperty(TTL).isSet()) { + ttl = context.getProperty(TTL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS); + } + + session = sessionProviderService.getCassandraSession(); + + deleteStatement = session.prepare(createDeleteStatement(keyField, tableName)); + existsStatement = session.prepare(createExistsQuery(keyField, tableName)); + fetchStatement = session.prepare(createFetchQuery(keyField, valueField, tableName)); + insertStatement = session.prepare(createInsertStatement(keyField, valueField, tableName, ttl)); + } + + @OnDisabled + public void onDisabled() { + session = null; + deleteStatement = null; + existsStatement = null; + fetchStatement = null; + insertStatement = null; + } + + @Override + public <K, V> boolean putIfAbsent(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + if (containsKey(k, keySerializer)) { + return false; + } else { + put(k, v, keySerializer, valueSerializer); + return true; + } + } + + @Override + public <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> deserializer) throws IOException { + V got = get(k, keySerializer, deserializer); + boolean wasAbsent = putIfAbsent(k, v, keySerializer, valueSerializer); + + return !wasAbsent ? got : null; + } + + @Override + public <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException { + byte[] key = serializeKey(k, serializer); + + BoundStatement statement = existsStatement.bind(); + ByteBuffer buffer = ByteBuffer.wrap(key); + statement.setBytes(0, buffer); + ResultSet rs =session.execute(statement); + Iterator<Row> iterator = rs.iterator(); + + if (iterator.hasNext()) { + Row row = iterator.next(); + long value = row.getLong("exist_count"); + return value > 0; + } else { + return false; + } + } + + @Override + public <K, V> void put(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + BoundStatement statement = insertStatement.bind(); + statement.setBytes(0, ByteBuffer.wrap(serializeKey(k, keySerializer))); + statement.setBytes(1, ByteBuffer.wrap(serializeValue(v, valueSerializer))); + session.execute(statement); + } + + @Override + public <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException { + BoundStatement boundStatement = fetchStatement.bind(); + boundStatement.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer))); + ResultSet rs = session.execute(boundStatement); + Iterator<Row> iterator = rs.iterator(); + if (!iterator.hasNext()) { + return null; + } + + Row fetched = iterator.next(); + ByteBuffer buffer = fetched.getBytes(valueField); + + byte[] content = buffer.array(); + + return deserializer.deserialize(content); + } + + @Override + public void close() throws IOException { + + } + + @Override + public <K> boolean remove(K k, Serializer<K> serializer) throws IOException { + BoundStatement delete = deleteStatement.bind(); + delete.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer))); + session.execute(delete); + + return true; + } + + @Override + public long removeByPattern(String s) throws IOException { + throw new UnsupportedOperationException(); + } + + private <K> byte[] serializeKey(K k, Serializer<K> keySerializer) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + keySerializer.serialize(k, out); + out.close(); + + return out.toByteArray(); + } + + private <V> byte[] serializeValue(V v, Serializer<V> valueSerializer) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + valueSerializer.serialize(v, out); + out.close(); + + return out.toByteArray(); + } +} diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java new file mode 100644 index 0000000..cc42dd7 --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.cassandra; + +public class QueryUtils { + private QueryUtils() {} + + public static String createDeleteStatement(String keyField, String table) { + return String.format("DELETE FROM %s WHERE %s = ?", table, keyField); + } + + public static String createExistsQuery(String keyField, String table) { + return String.format("SELECT COUNT(*) as exist_count FROM %s WHERE %s = ?", table, keyField); + } + + public static String createFetchQuery(String keyField, String valueField, String table) { + return String.format("SELECT %s FROM %s WHERE %s = ?", valueField, table, keyField); + } + + public static String createInsertStatement(String keyField, String valueField, String table, Long ttl) { + String retVal = String.format("INSERT INTO %s (%s, %s) VALUES(?, ?)", table, keyField, valueField); + + if (ttl != null) { + retVal += String.format(" using ttl %d", ttl); + } + + return retVal; + } +} diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..c3d32c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.controller.cassandra.CassandraDistributedMapCache \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy new file mode 100644 index 0000000..4db419d --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi + +import com.datastax.driver.core.Session +import org.apache.nifi.controller.cassandra.CassandraDistributedMapCache +import org.apache.nifi.distributed.cache.client.Deserializer +import org.apache.nifi.distributed.cache.client.Serializer +import org.apache.nifi.processor.AbstractProcessor +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSession +import org.apache.nifi.processor.exception.ProcessException +import org.apache.nifi.service.CassandraSessionProvider +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.junit.AfterClass +import org.junit.BeforeClass +import org.junit.Test +/** + * Setup instructions: + * + * docker run -p 7000:7000 -p 9042:9042 --name cassandra --restart always -d cassandra:3 + * + * docker exec -it cassandra cqlsh + * + * Keyspace CQL: create keyspace nifi_test with replication = { 'replication_factor': 1, 'class': 'SimpleStrategy' } ; + * + * Table SQL: create table dmc (id blob, value blob, primary key(id)); + */ +class CassandraDistributedMapCacheIT { + static TestRunner runner + static CassandraDistributedMapCache distributedMapCache + static Session session + + @BeforeClass + static void setup() { + runner = TestRunners.newTestRunner(new AbstractProcessor() { + @Override + void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { + + } + }) + distributedMapCache = new CassandraDistributedMapCache() + + def cassandraService = new CassandraSessionProvider() + runner.addControllerService("provider", cassandraService) + runner.addControllerService("dmc", distributedMapCache) + runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042") + runner.setProperty(cassandraService, CassandraSessionProvider.KEYSPACE, "nifi_test") + runner.setProperty(distributedMapCache, CassandraDistributedMapCache.SESSION_PROVIDER, "provider") + runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TABLE_NAME, "dmc") + runner.setProperty(distributedMapCache, CassandraDistributedMapCache.KEY_FIELD_NAME, "id") + runner.setProperty(distributedMapCache, CassandraDistributedMapCache.VALUE_FIELD_NAME, "value") + runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TTL, "5 sec") + runner.enableControllerService(cassandraService) + runner.enableControllerService(distributedMapCache) + runner.assertValid() + + session = cassandraService.getCassandraSession(); + session.execute(""" + INSERT INTO dmc (id, value) VALUES(textAsBlob('contains-key'), textAsBlob('testvalue')) + """) + session.execute(""" + INSERT INTO dmc (id, value) VALUES(textAsBlob('delete-key'), textAsBlob('testvalue')) + """) + session.execute(""" + INSERT INTO dmc (id, value) VALUES(textAsBlob('get-and-put-key'), textAsBlob('testvalue')) + """) + } + + @AfterClass + static void cleanup() { + session.execute("TRUNCATE dmc") + } + + Serializer<String> serializer = { str, os -> + os.write(str.bytes) + } as Serializer + + Deserializer<String> deserializer = { input -> + new String(input) + } as Deserializer + + @Test + void testContainsKey() { + def contains = distributedMapCache.containsKey("contains-key", serializer) + assert contains + } + + @Test + void testGetAndPutIfAbsent() { + def result = distributedMapCache.getAndPutIfAbsent('get-and-put-key', 'testing', serializer, serializer, deserializer) + assert result == 'testvalue' + } + + @Test + void testRemove() { + distributedMapCache.remove("delete-key", serializer) + } + + @Test + void testGet() { + def result = distributedMapCache.get("contains-key", serializer, deserializer) + assert result == "testvalue" + } + + @Test + void testPut() { + distributedMapCache.put("put-key", "sometestdata", serializer, serializer) + Thread.sleep(1000) + assert distributedMapCache.containsKey("put-key", serializer) + } + + @Test + void testPutIfAbsent() { + assert distributedMapCache.putIfAbsent("put-if-absent-key", "testingthis", serializer, serializer) + assert !distributedMapCache.putIfAbsent("put-if-absent-key", "testingthis", serializer, serializer) + } +} diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/java/.gitignore b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/java/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml index abd07ee..a8c2672 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml @@ -36,6 +36,11 @@ <artifactId>nifi-cassandra-services</artifactId> <version>1.13.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-distributedmapcache-service</artifactId> + <version>1.13.0-SNAPSHOT</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml index be2e23a..bb538c0 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml @@ -32,6 +32,7 @@ <modules> <module>nifi-cassandra-processors</module> <module>nifi-cassandra-nar</module> + <module>nifi-cassandra-distributedmapcache-service</module> <module>nifi-cassandra-services-api</module> <module>nifi-cassandra-services-api-nar</module> <module>nifi-cassandra-services</module>