http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java new file mode 100644 index 0000000..2c85cd8 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.IOException; + +public interface CacheServer { + + void start() throws IOException; + void stop() throws IOException; + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java new file mode 100644 index 0000000..0f962d0 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.processor.annotation.OnShutdown; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +public abstract class DistributedCacheServer extends AbstractControllerService { + public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; + public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; + public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; + + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("The port to listen on for incoming connections") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .build(); + public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder() + .name("Maximum Cache Entries") + .description("The maximum number of cache entries that the cache can hold") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); + public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder() + .name("Eviction Strategy") + .description("Determines which strategy should be used to evict values from the cache to make room for new entries") + .required(true) + .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) + .defaultValue(EVICTION_STRATEGY_LFU) + .build(); + public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder() + .name("Persistence Directory") + .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") + .required(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) + .build(); + + private volatile CacheServer cacheServer; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(PORT); + properties.add(MAX_CACHE_ENTRIES); + properties.add(EVICTION_POLICY); + properties.add(PERSISTENCE_PATH); + properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues( + getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build()); + return properties; + } + + @OnConfigured + public void startServer(final ConfigurationContext context) throws IOException { + if (cacheServer == null) { + cacheServer = createCacheServer(context); + cacheServer.start(); + } + } + + @OnShutdown + public void shutdownServer() throws IOException { + if (cacheServer != null) { + cacheServer.stop(); + } + cacheServer = null; + } + + @Override + protected void finalize() throws Throwable { + shutdownServer(); + } + + protected abstract CacheServer createCacheServer(ConfigurationContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java new file mode 100644 index 0000000..426573f --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.File; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +public class DistributedSetCacheServer extends DistributedCacheServer { + + @Override + protected CacheServer createCacheServer(final ConfigurationContext context) { + final int port = context.getProperty(PORT).asInteger(); + final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); + final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); + + final SSLContext sslContext; + if ( sslContextService == null ) { + sslContext = null; + } else { + sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); + } + + final EvictionPolicy evictionPolicy; + switch (evictionPolicyName) { + case EVICTION_STRATEGY_FIFO: + evictionPolicy = EvictionPolicy.FIFO; + break; + case EVICTION_STRATEGY_LFU: + evictionPolicy = EvictionPolicy.LFU; + break; + case EVICTION_STRATEGY_LRU: + evictionPolicy = EvictionPolicy.LRU; + break; + default: + throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); + } + + try { + final File persistenceDir = persistencePath == null ? null : new File(persistencePath); + + return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java new file mode 100644 index 0000000..60bd2c1 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.util.Comparator; + +public enum EvictionPolicy { + LFU(new LFUComparator()), + LRU(new LRUComparator()), + FIFO(new FIFOComparator()); + + private final Comparator<CacheRecord> comparator; + + private EvictionPolicy(final Comparator<CacheRecord> comparator) { + this.comparator = comparator; + } + + public Comparator<CacheRecord> getComparator() { + return comparator; + } + + public static class LFUComparator implements Comparator<CacheRecord> { + @Override + public int compare(final CacheRecord o1, final CacheRecord o2) { + if ( o1.equals(o2) ) { + return 0; + } + + final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount()); + final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison; + return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); + } + } + + public static class LRUComparator implements Comparator<CacheRecord> { + @Override + public int compare(final CacheRecord o1, final CacheRecord o2) { + if ( o1.equals(o2) ) { + return 0; + } + + final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate()); + return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison); + } + } + + public static class FIFOComparator implements Comparator<CacheRecord> { + @Override + public int compare(final CacheRecord o1, final CacheRecord o2) { + if ( o1.equals(o2) ) { + return 0; + } + + final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate()); + return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java new file mode 100644 index 0000000..5d2c0f6 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.distributed.cache.server.set.PersistentSetCache; +import org.apache.nifi.distributed.cache.server.set.SetCache; +import org.apache.nifi.distributed.cache.server.set.SetCacheResult; +import org.apache.nifi.distributed.cache.server.set.SimpleSetCache; +import org.apache.nifi.io.DataOutputStream; + +public class SetCacheServer extends AbstractCacheServer { + + private final SetCache cache; + + public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize, + final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException { + super(identifier, sslContext, port); + + final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy); + + if (persistencePath == null) { + this.cache = simpleCache; + } else { + final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache); + persistentCache.restore(); + this.cache = persistentCache; + } + } + + @Override + protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + + final String action = dis.readUTF(); + if (action.equals("close")) { + return false; + } + + final int valueLength = dis.readInt(); + final byte[] value = new byte[valueLength]; + dis.readFully(value); + final ByteBuffer valueBuffer = ByteBuffer.wrap(value); + + final SetCacheResult response; + switch (action) { + case "addIfAbsent": + response = cache.addIfAbsent(valueBuffer); + break; + case "contains": + response = cache.contains(valueBuffer); + break; + case "remove": + response = cache.remove(valueBuffer); + break; + default: + throw new IOException("IllegalRequest"); + } + + dos.writeBoolean(response.getResult()); + dos.flush(); + + return true; + } + + @Override + public void stop() throws IOException { + try { + super.stop(); + } finally { + cache.shutdown(); + } + } + + @Override + protected void finalize() throws Throwable { + if (!stopped) + stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java new file mode 100644 index 0000000..920529d --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.File; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.server.CacheServer; +import org.apache.nifi.distributed.cache.server.DistributedCacheServer; +import org.apache.nifi.distributed.cache.server.EvictionPolicy; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +public class DistributedMapCacheServer extends DistributedCacheServer { + + @Override + protected CacheServer createCacheServer(final ConfigurationContext context) { + final int port = context.getProperty(PORT).asInteger(); + final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); + final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); + + final SSLContext sslContext; + if ( sslContextService == null ) { + sslContext = null; + } else { + sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); + } + + final EvictionPolicy evictionPolicy; + switch (evictionPolicyName) { + case EVICTION_STRATEGY_FIFO: + evictionPolicy = EvictionPolicy.FIFO; + break; + case EVICTION_STRATEGY_LFU: + evictionPolicy = EvictionPolicy.LFU; + break; + case EVICTION_STRATEGY_LRU: + evictionPolicy = EvictionPolicy.LRU; + break; + default: + throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); + } + + try { + final File persistenceDir = persistencePath == null ? null : new File(persistencePath); + + return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java new file mode 100644 index 0000000..534cb0b --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface MapCache { + + MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; + boolean containsKey(ByteBuffer key) throws IOException; + ByteBuffer get(ByteBuffer key) throws IOException; + ByteBuffer remove(ByteBuffer key) throws IOException; + void shutdown() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java new file mode 100644 index 0000000..b0ab0c4 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.nio.ByteBuffer; + +import org.apache.nifi.distributed.cache.server.CacheRecord; + +public class MapCacheRecord extends CacheRecord { + private final ByteBuffer key; + private final ByteBuffer value; + + public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { + this.key = key; + this.value = value; + } + + public ByteBuffer getKey() { + return key; + } + + public ByteBuffer getValue() { + return value; + } + + @Override + public int hashCode() { + return 2938476 + key.hashCode() * value.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if ( obj == this ) { + return true; + } + + if ( obj instanceof MapCacheRecord ) { + final MapCacheRecord that = ((MapCacheRecord) obj); + return key.equals(that.key) && value.equals(that.value); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java new file mode 100644 index 0000000..3e8dd0e --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.distributed.cache.server.AbstractCacheServer; +import org.apache.nifi.distributed.cache.server.EvictionPolicy; +import org.apache.nifi.io.DataOutputStream; + +public class MapCacheServer extends AbstractCacheServer { + + private final MapCache cache; + + public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize, + final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException { + super(identifier, sslContext, port); + + final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy); + + if (persistencePath == null) { + this.cache = simpleCache; + } else { + final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache); + persistentCache.restore(); + this.cache = persistentCache; + } + } + + @Override + protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + final String action = dis.readUTF(); + try { + switch (action) { + case "close": { + return false; + } + case "putIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(putResult.isSuccessful()); + break; + } + case "containsKey": { + final byte[] key = readValue(dis); + final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); + dos.writeBoolean(contains); + break; + } + case "getAndPutIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + if (putResult.isSuccessful()) { + // Put was successful. There was no old value to get. + dos.writeInt(0); + } else { + // we didn't put. Write back the previous value + final byte[] byteArray = putResult.getExistingValue().array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "get": { + final byte[] key = readValue(dis); + final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); + if (existingValue == null) { + // there was no existing value; we did a "put". + dos.writeInt(0); + } else { + // a value already existed. we did not update the map + final byte[] byteArray = existingValue.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "remove": { + final byte[] key = readValue(dis); + final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; + dos.writeBoolean(removed); + break; + } + default: { + throw new IOException("Illegal Request"); + } + } + } finally { + dos.flush(); + } + + return true; + } + + @Override + public void stop() throws IOException { + try { + super.stop(); + } finally { + cache.shutdown(); + } + } + + @Override + protected void finalize() throws Throwable { + if (!stopped) + stop(); + } + + private byte[] readValue(final DataInputStream dis) throws IOException { + final int numBytes = dis.readInt(); + final byte[] buffer = new byte[numBytes]; + dis.readFully(buffer); + return buffer; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java new file mode 100644 index 0000000..29695eb --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.nio.ByteBuffer; + +public class MapPutResult { + private final boolean successful; + private final ByteBuffer key, value; + private final ByteBuffer existingValue; + private final ByteBuffer evictedKey, evictedValue; + + public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { + this.successful = successful; + this.key = key; + this.value = value; + this.existingValue = existingValue; + this.evictedKey = evictedKey; + this.evictedValue = evictedValue; + } + + public boolean isSuccessful() { + return successful; + } + + public ByteBuffer getKey() { + return key; + } + + public ByteBuffer getValue() { + return value; + } + + public ByteBuffer getExistingValue() { + return existingValue; + } + + public ByteBuffer getEvictedKey() { + return evictedKey; + } + + public ByteBuffer getEvictedValue() { + return evictedValue; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java new file mode 100644 index 0000000..77fb77d --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.SerDe; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +public class PersistentMapCache implements MapCache { + + private final MapCache wrapped; + private final WriteAheadRepository<MapWaliRecord> wali; + + private final AtomicLong modifications = new AtomicLong(0L); + + public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException { + wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); + wrapped = cacheToWrap; + } + + synchronized void restore() throws IOException { + final Collection<MapWaliRecord> recovered = wali.recoverRecords(); + for ( final MapWaliRecord record : recovered ) { + if ( record.getUpdateType() == UpdateType.CREATE ) { + wrapped.putIfAbsent(record.getKey(), record.getValue()); + } + } + } + + @Override + public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { + final MapPutResult putResult = wrapped.putIfAbsent(key, value); + if ( putResult.isSuccessful() ) { + // The put was successful. + final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); + final List<MapWaliRecord> records = new ArrayList<>(); + records.add(record); + + if ( putResult.getEvictedKey() != null ) { + records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); + } + + wali.update(Collections.singletonList(record), false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 100000 == 0 ) { + wali.checkpoint(); + } + } + + return putResult; + } + + @Override + public boolean containsKey(final ByteBuffer key) throws IOException { + return wrapped.containsKey(key); + } + + @Override + public ByteBuffer get(final ByteBuffer key) throws IOException { + return wrapped.get(key); + } + + @Override + public ByteBuffer remove(ByteBuffer key) throws IOException { + final ByteBuffer removeResult = wrapped.remove(key); + if ( removeResult != null ) { + final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); + final List<MapWaliRecord> records = new ArrayList<>(1); + records.add(record); + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 1000 == 0 ) { + wali.checkpoint(); + } + } + return removeResult; + } + + + @Override + public void shutdown() throws IOException { + wali.shutdown(); + } + + + private static class MapWaliRecord { + private final UpdateType updateType; + private final ByteBuffer key; + private final ByteBuffer value; + + public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) { + this.updateType = updateType; + this.key = key; + this.value = value; + } + + public UpdateType getUpdateType() { + return updateType; + } + + public ByteBuffer getKey() { + return key; + } + + public ByteBuffer getValue() { + return value; + } + } + + private static class Serde implements SerDe<MapWaliRecord> { + + @Override + public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { + final UpdateType updateType = newRecordState.getUpdateType(); + if ( updateType == UpdateType.DELETE ) { + out.write(0); + } else { + out.write(1); + } + + final byte[] key = newRecordState.getKey().array(); + final byte[] value = newRecordState.getValue().array(); + + out.writeInt(key.length); + out.write(key); + out.writeInt(value.length); + out.write(value); + } + + @Override + public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException { + serializeEdit(null, record, out); + } + + @Override + public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException { + final int updateTypeValue = in.read(); + if ( updateTypeValue < 0 ) { + throw new EOFException(); + } + + final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE); + + final int keySize = in.readInt(); + final byte[] key = new byte[keySize]; + in.readFully(key); + + final int valueSize = in.readInt(); + final byte[] value = new byte[valueSize]; + in.readFully(value); + + return new MapWaliRecord(updateType, ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + } + + @Override + public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException { + return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version); + } + + @Override + public Object getRecordIdentifier(final MapWaliRecord record) { + return record.getKey(); + } + + @Override + public UpdateType getUpdateType(final MapWaliRecord record) { + return record.getUpdateType(); + } + + @Override + public String getLocation(final MapWaliRecord record) { + return null; + } + + @Override + public int getVersion() { + return 1; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java new file mode 100644 index 0000000..10139f1 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.distributed.cache.server.EvictionPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleMapCache implements MapCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class); + + private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>(); + private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private final String serviceIdentifier; + + private final int maxSize; + + public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { + // need to change to ConcurrentMap as this is modified when only the readLock is held + inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator()); + this.serviceIdentifier = serviceIdentifier; + this.maxSize = maxSize; + } + + @Override + public String toString() { + return "SimpleSetCache[service id=" + serviceIdentifier + "]"; + } + + // don't need synchronized because this method is only called when the writeLock is held, and all + // public methods obtain either the read or write lock + private MapCacheRecord evict() { + if ( cache.size() < maxSize ) { + return null; + } + + final MapCacheRecord recordToEvict = inverseCacheMap.firstKey(); + final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); + cache.remove(valueToEvict); + + if ( logger.isDebugEnabled() ) { + logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); + } + + return recordToEvict; + } + + @Override + public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) { + writeLock.lock(); + try { + final MapCacheRecord record = cache.get(key); + if ( record == null ) { + // Record is null. We will add. + final MapCacheRecord evicted = evict(); + final MapCacheRecord newRecord = new MapCacheRecord(key, value); + cache.put(key, newRecord); + inverseCacheMap.put(newRecord, key); + + if ( evicted == null ) { + return new MapPutResult(true, key, value, null, null, null); + } else { + return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue()); + } + } + + // Record is not null. Increment hit count and return result indicating that record was not added. + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + + return new MapPutResult(false, key, value, record.getValue(), null, null); + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean containsKey(final ByteBuffer key) { + readLock.lock(); + try { + final MapCacheRecord record = cache.get(key); + if ( record == null ) { + return false; + } + + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + + return true; + } finally { + readLock.unlock(); + } + } + + @Override + public ByteBuffer get(final ByteBuffer key) { + readLock.lock(); + try { + final MapCacheRecord record = cache.get(key); + if ( record == null ) { + return null; + } + + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + + return record.getValue(); + } finally { + readLock.unlock(); + } + } + + @Override + public ByteBuffer remove(ByteBuffer key) throws IOException { + writeLock.lock(); + try { + final MapCacheRecord record = cache.remove(key); + if (record == null) { + return null; + } + inverseCacheMap.remove(record); + return record.getValue(); + } finally { + writeLock.unlock(); + } + } + + @Override + public void shutdown() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java new file mode 100644 index 0000000..4d75fc0 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.SerDe; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +public class PersistentSetCache implements SetCache { + + private final SetCache wrapped; + private final WriteAheadRepository<SetRecord> wali; + + private final AtomicLong modifications = new AtomicLong(0L); + + public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException { + wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); + wrapped = cacheToWrap; + } + + public synchronized void restore() throws IOException { + final Collection<SetRecord> recovered = wali.recoverRecords(); + for ( final SetRecord record : recovered ) { + if ( record.getUpdateType() == UpdateType.CREATE ) { + addIfAbsent(record.getBuffer()); + } + } + } + + @Override + public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException { + final SetCacheResult removeResult = wrapped.remove(value); + if ( removeResult.getResult() ) { + final SetRecord record = new SetRecord(UpdateType.DELETE, value); + final List<SetRecord> records = new ArrayList<>(); + records.add(record); + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 1000 == 0 ) { + wali.checkpoint(); + } + } + + return removeResult; + } + + @Override + public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException { + final SetCacheResult addResult = wrapped.addIfAbsent(value); + if ( addResult.getResult() ) { + final SetRecord record = new SetRecord(UpdateType.CREATE, value); + final List<SetRecord> records = new ArrayList<>(); + records.add(record); + + final SetCacheRecord evictedRecord = addResult.getEvictedRecord(); + if ( evictedRecord != null ) { + records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue())); + } + + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 1000 == 0 ) { + wali.checkpoint(); + } + } + + return addResult; + } + + @Override + public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException { + return wrapped.contains(value); + } + + @Override + public void shutdown() throws IOException { + wali.shutdown(); + } + + private static class SetRecord { + private final UpdateType updateType; + private final ByteBuffer value; + + public SetRecord(final UpdateType updateType, final ByteBuffer value) { + this.updateType = updateType; + this.value = value; + } + + public UpdateType getUpdateType() { + return updateType; + } + + public ByteBuffer getBuffer() { + return value; + } + + public byte[] getData() { + return value.array(); + } + } + + private static class Serde implements SerDe<SetRecord> { + + @Override + public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException { + final UpdateType updateType = newRecordState.getUpdateType(); + if ( updateType == UpdateType.DELETE ) { + out.write(0); + } else { + out.write(1); + } + + final byte[] data = newRecordState.getData(); + out.writeInt(data.length); + out.write(newRecordState.getData()); + } + + @Override + public void serializeRecord(SetRecord record, DataOutputStream out) throws IOException { + serializeEdit(null, record, out); + } + + @Override + public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException { + final int value = in.read(); + if ( value < 0 ) { + throw new EOFException(); + } + + final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE); + + final int size = in.readInt(); + final byte[] data = new byte[size]; + in.readFully(data); + + return new SetRecord(updateType, ByteBuffer.wrap(data)); + } + + @Override + public SetRecord deserializeRecord(DataInputStream in, int version) throws IOException { + return deserializeEdit(in, new HashMap<Object, SetRecord>(), version); + } + + @Override + public Object getRecordIdentifier(final SetRecord record) { + return record.getBuffer(); + } + + @Override + public UpdateType getUpdateType(final SetRecord record) { + return record.getUpdateType(); + } + + @Override + public String getLocation(final SetRecord record) { + return null; + } + + @Override + public int getVersion() { + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java new file mode 100644 index 0000000..bf6ae3e --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface SetCache { + + SetCacheResult remove(ByteBuffer value) throws IOException; + SetCacheResult addIfAbsent(ByteBuffer value) throws IOException; + SetCacheResult contains(ByteBuffer value) throws IOException; + void shutdown() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java new file mode 100644 index 0000000..20b6fae --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.nio.ByteBuffer; + +import org.apache.nifi.distributed.cache.server.CacheRecord; + +public class SetCacheRecord extends CacheRecord { + private final ByteBuffer value; + + public SetCacheRecord(final ByteBuffer value) { + this.value = value; + } + + public ByteBuffer getValue() { + return value; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if ( this == obj ) { + return true; + } + + if (obj instanceof SetCacheRecord) { + return value.equals(((SetCacheRecord) obj).value); + } + return false; + } + + @Override + public String toString() { + return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java new file mode 100644 index 0000000..732c4f0 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + + + +public class SetCacheResult { + private final boolean result; + private final SetCacheRecord stats; + private final SetCacheRecord evictedRecord; + + public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) { + this.result = result; + this.stats = stats; + this.evictedRecord = evictedRecord; + } + + public boolean getResult() { + return result; + } + + public SetCacheRecord getRecord() { + return stats; + } + + public SetCacheRecord getEvictedRecord() { + return evictedRecord; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java new file mode 100644 index 0000000..77d6481 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.nifi.distributed.cache.server.EvictionPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleSetCache implements SetCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class); + + private final Map<ByteBuffer, SetCacheRecord> cache = new HashMap<>(); + private final SortedMap<SetCacheRecord, ByteBuffer> inverseCacheMap; + + private final String serviceIdentifier; + + private final int maxSize; + + public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { + inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator()); + this.serviceIdentifier = serviceIdentifier; + this.maxSize = maxSize; + } + + private synchronized SetCacheRecord evict() { + if ( cache.size() < maxSize ) { + return null; + } + + final SetCacheRecord recordToEvict = inverseCacheMap.firstKey(); + final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); + cache.remove(valueToEvict); + + if ( logger.isDebugEnabled() ) { + logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); + } + + return recordToEvict; + } + + @Override + public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) { + final SetCacheRecord record = cache.get(value); + if ( record == null ) { + final SetCacheRecord evicted = evict(); + final SetCacheRecord newRecord = new SetCacheRecord(value); + cache.put(value, newRecord); + inverseCacheMap.put(newRecord, value); + return new SetCacheResult(true, newRecord, evicted); + } else { + // We have to remove the record and add it again in order to cause the Map to stay sorted + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, value); + + return new SetCacheResult(false, record, null); + } + } + + @Override + public synchronized SetCacheResult contains(final ByteBuffer value) { + final SetCacheRecord record = cache.get(value); + if ( record == null ) { + return new SetCacheResult(false, null, null); + } else { + // We have to remove the record and add it again in order to cause the Map to stay sorted + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, value); + + return new SetCacheResult(true, record, null); + } + } + + @Override + public synchronized SetCacheResult remove(final ByteBuffer value) { + final SetCacheRecord record = cache.remove(value); + if ( record == null ) { + return new SetCacheResult(false, null, null); + } else { + inverseCacheMap.remove(record); + return new SetCacheResult(true, record, null); + } + } + + @Override + public String toString() { + return "SimpleSetCache[service id=" + serviceIdentifier + "]"; + } + + @Override + public void shutdown() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..0509c7c --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.distributed.cache.server.DistributedSetCacheServer +org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html new file mode 100644 index 0000000..dca3aa1 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html @@ -0,0 +1,82 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> +<meta charset="utf-8" /> +<title>Distributed Map Cache Client Service</title> +<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> + <h2>Description:</h2> + + <p>A Controller Service that starts an embedded server and listens for connections from clients. The + server provides the ability to query the cache, add data to the cache, and remove data from the cache.</p> + + + + <p> + <strong>Properties:</strong> + </p> + <p>In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.</p> + + <ul> + <li><strong>Port</strong> + <ul> + <li>The port to listen on for incoming connections</li> + <li>Default value: 4557</li> + <li>Supports expression language: false</li> + </ul></li> + <li>SSL Context Service + <ul> + <li>If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure</li> + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Maximum Cache Entries</strong> + <ul> + <li>The maximum number of cache entries that the cache can hold + <li>Default value: 10,000</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Eviction Strategy</strong> + <ul> + <li>Determines which strategy should be used to evict values from the cache to make room for new entries. Valid values: + <code>Least Frequently Used</code>, <code>Least Recently Used</code>, and <code>First In, First Out</code> + <li>Default value: Least Frequently Used</li> + <li>Supports expression language: false</li> + </ul></li> + <li>Persistence Directory + <ul> + <li>If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only</li> + <li>Default value: no default (in-memory)</li> + <li>Supports expression language: true - JVM and System Properties Only</li> + </ul></li> + </ul> + + + <i>See Also:</i> + <ul> + <li><a href="../org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html">Distributed Map Cache Client Service</a></li> + <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li> + </ul> + +</body> +</html>