http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java deleted file mode 100644 index 8049d42..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.protocol.exception; - -public class HandshakeException extends Exception { - public HandshakeException(final String message) { - super(message); - } - - public HandshakeException(final Throwable cause) { - super(cause); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml deleted file mode 100644 index 5dec322..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml +++ /dev/null @@ -1,81 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-services-bundle</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - - <artifactId>distributed-cache-server</artifactId> - - <name>Distributed Cache Server</name> - <description>Provides a Controller Service for hosting Distributed Caches</description> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-protocol</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>remote-communications-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-processor-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-stream-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>ssl-context-service-api</artifactId> - </dependency> - <dependency> - <groupId>wali</groupId> - <artifactId>wali</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-client-service-api</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-client-service</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>ssl-context-service</artifactId> - </dependency> - </dependencies> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java deleted file mode 100644 index 9b4e70e..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; -import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; -import org.apache.nifi.io.BufferedInputStream; -import org.apache.nifi.io.BufferedOutputStream; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.io.socket.SocketChannelInputStream; -import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractCacheServer implements CacheServer { - - private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class); - - private final String identifier; - private final int port; - private final SSLContext sslContext; - protected volatile boolean stopped = false; - private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();; - - private volatile ServerSocketChannel serverSocketChannel; - - public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) { - this.identifier = identifier; - this.port = port; - this.sslContext = sslContext; - } - - @Override - public void start() throws IOException { - serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.configureBlocking(true); - serverSocketChannel.bind(new InetSocketAddress(port)); - - final Runnable runnable = new Runnable() { - - @Override - public void run() { - while (true) { - final SocketChannel socketChannel; - try { - socketChannel = serverSocketChannel.accept(); - logger.debug("Connected to {}", new Object[] { socketChannel }); - } catch (final IOException e) { - if (!stopped) { - logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - return; - } - - final Runnable processInputRunnable = new Runnable() { - @Override - public void run() { - final InputStream rawInputStream; - final OutputStream rawOutputStream; - final String peer = socketChannel.socket().getInetAddress().getHostName(); - - try { - if (sslContext == null) { - rawInputStream = new SocketChannelInputStream(socketChannel); - rawOutputStream = new SocketChannelOutputStream(socketChannel); - } else { - final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false); - sslSocketChannel.connect(); - rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel); - rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel); - } - } catch (IOException e) { - logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - try { - socketChannel.close(); - } catch (IOException swallow) { - } - - return; - } - try (final InputStream in = new BufferedInputStream(rawInputStream); - final OutputStream out = new BufferedOutputStream(rawOutputStream)) { - - final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); - - ProtocolHandshake.receiveHandshake(in, out, versionNegotiator); - - boolean continueComms = true; - while (continueComms) { - continueComms = listen(in, out, versionNegotiator.getVersion()); - } - // client has issued 'close' - logger.debug("Client issued close on {}", new Object[] { socketChannel }); - } catch (final SocketTimeoutException e) { - logger.debug("30 sec timeout reached", e); - } catch (final IOException | HandshakeException e) { - if (!stopped) { - logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() }); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - } finally { - processInputThreads.remove(Thread.currentThread()); - } - } - }; - - final Thread processInputThread = new Thread(processInputRunnable); - processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier); - processInputThread.setDaemon(true); - processInputThread.start(); - processInputThreads.add(processInputThread); - } - } - }; - - final Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName("Distributed Cache Server: " + identifier); - thread.start(); - } - - @Override - public void stop() throws IOException { - stopped = true; - logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); - - if (serverSocketChannel != null) { - serverSocketChannel.close(); - } - // need to close out the created SocketChannels...this is done by interrupting - // the created threads that loop on listen(). - for (Thread processInputThread : processInputThreads) { - processInputThread.interrupt(); - int i = 0; - while (!processInputThread.isInterrupted() && i++ < 5) { - try { - Thread.sleep(50); // allow thread to gracefully terminate - } catch (InterruptedException e) { - } - } - } - processInputThreads.clear(); - } - - @Override - public String toString() { - return "CacheServer[id=" + identifier + "]"; - } - - /** - * Listens for incoming data and communicates with remote peer - * - * @param in - * @param out - * @param version - * @return <code>true</code> if communications should continue, <code>false</code> otherwise - * @throws IOException - */ - protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java deleted file mode 100644 index 71ac56d..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class CacheRecord { - - private static final AtomicLong idGenerator = new AtomicLong(0L); - - private final long id; - private final long entryDate; - private volatile long lastHitDate; - private final AtomicInteger hitCount = new AtomicInteger(0); - - public CacheRecord() { - entryDate = System.currentTimeMillis(); - lastHitDate = entryDate; - id = idGenerator.getAndIncrement(); - } - - public long getEntryDate() { - return entryDate; - } - - public long getLastHitDate() { - return lastHitDate; - } - - public int getHitCount() { - return hitCount.get(); - } - - public void hit() { - hitCount.getAndIncrement(); - lastHitDate = System.currentTimeMillis(); - } - - public long getId() { - return id; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java deleted file mode 100644 index 2c85cd8..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.io.IOException; - -public interface CacheServer { - - void start() throws IOException; - void stop() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java deleted file mode 100644 index 0f962d0..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.annotation.OnConfigured; -import org.apache.nifi.processor.annotation.OnShutdown; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.ssl.SSLContextService; - -public abstract class DistributedCacheServer extends AbstractControllerService { - public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; - public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; - public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; - - public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Port") - .description("The port to listen on for incoming connections") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .defaultValue("4557") - .build(); - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description( - "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure") - .required(false) - .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) - .build(); - public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder() - .name("Maximum Cache Entries") - .description("The maximum number of cache entries that the cache can hold") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000") - .build(); - public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder() - .name("Eviction Strategy") - .description("Determines which strategy should be used to evict values from the cache to make room for new entries") - .required(true) - .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) - .defaultValue(EVICTION_STRATEGY_LFU) - .build(); - public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder() - .name("Persistence Directory") - .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") - .required(false) - .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) - .build(); - - private volatile CacheServer cacheServer; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(PORT); - properties.add(MAX_CACHE_ENTRIES); - properties.add(EVICTION_POLICY); - properties.add(PERSISTENCE_PATH); - properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues( - getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build()); - return properties; - } - - @OnConfigured - public void startServer(final ConfigurationContext context) throws IOException { - if (cacheServer == null) { - cacheServer = createCacheServer(context); - cacheServer.start(); - } - } - - @OnShutdown - public void shutdownServer() throws IOException { - if (cacheServer != null) { - cacheServer.stop(); - } - cacheServer = null; - } - - @Override - protected void finalize() throws Throwable { - shutdownServer(); - } - - protected abstract CacheServer createCacheServer(ConfigurationContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java deleted file mode 100644 index 426573f..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.io.File; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; - -public class DistributedSetCacheServer extends DistributedCacheServer { - - @Override - protected CacheServer createCacheServer(final ConfigurationContext context) { - final int port = context.getProperty(PORT).asInteger(); - final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue(); - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); - final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); - - final SSLContext sslContext; - if ( sslContextService == null ) { - sslContext = null; - } else { - sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); - } - - final EvictionPolicy evictionPolicy; - switch (evictionPolicyName) { - case EVICTION_STRATEGY_FIFO: - evictionPolicy = EvictionPolicy.FIFO; - break; - case EVICTION_STRATEGY_LFU: - evictionPolicy = EvictionPolicy.LFU; - break; - case EVICTION_STRATEGY_LRU: - evictionPolicy = EvictionPolicy.LRU; - break; - default: - throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); - } - - try { - final File persistenceDir = persistencePath == null ? null : new File(persistencePath); - - return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java deleted file mode 100644 index 60bd2c1..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.util.Comparator; - -public enum EvictionPolicy { - LFU(new LFUComparator()), - LRU(new LRUComparator()), - FIFO(new FIFOComparator()); - - private final Comparator<CacheRecord> comparator; - - private EvictionPolicy(final Comparator<CacheRecord> comparator) { - this.comparator = comparator; - } - - public Comparator<CacheRecord> getComparator() { - return comparator; - } - - public static class LFUComparator implements Comparator<CacheRecord> { - @Override - public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { - return 0; - } - - final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount()); - final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison; - return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); - } - } - - public static class LRUComparator implements Comparator<CacheRecord> { - @Override - public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { - return 0; - } - - final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate()); - return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison); - } - } - - public static class FIFOComparator implements Comparator<CacheRecord> { - @Override - public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { - return 0; - } - - final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate()); - return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java deleted file mode 100644 index 5d2c0f6..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server; - -import java.io.DataInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.distributed.cache.server.set.PersistentSetCache; -import org.apache.nifi.distributed.cache.server.set.SetCache; -import org.apache.nifi.distributed.cache.server.set.SetCacheResult; -import org.apache.nifi.distributed.cache.server.set.SimpleSetCache; -import org.apache.nifi.io.DataOutputStream; - -public class SetCacheServer extends AbstractCacheServer { - - private final SetCache cache; - - public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize, - final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException { - super(identifier, sslContext, port); - - final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy); - - if (persistencePath == null) { - this.cache = simpleCache; - } else { - final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache); - persistentCache.restore(); - this.cache = persistentCache; - } - } - - @Override - protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { - final DataInputStream dis = new DataInputStream(in); - final DataOutputStream dos = new DataOutputStream(out); - - final String action = dis.readUTF(); - if (action.equals("close")) { - return false; - } - - final int valueLength = dis.readInt(); - final byte[] value = new byte[valueLength]; - dis.readFully(value); - final ByteBuffer valueBuffer = ByteBuffer.wrap(value); - - final SetCacheResult response; - switch (action) { - case "addIfAbsent": - response = cache.addIfAbsent(valueBuffer); - break; - case "contains": - response = cache.contains(valueBuffer); - break; - case "remove": - response = cache.remove(valueBuffer); - break; - default: - throw new IOException("IllegalRequest"); - } - - dos.writeBoolean(response.getResult()); - dos.flush(); - - return true; - } - - @Override - public void stop() throws IOException { - try { - super.stop(); - } finally { - cache.shutdown(); - } - } - - @Override - protected void finalize() throws Throwable { - if (!stopped) - stop(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java deleted file mode 100644 index 920529d..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.io.File; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.distributed.cache.server.CacheServer; -import org.apache.nifi.distributed.cache.server.DistributedCacheServer; -import org.apache.nifi.distributed.cache.server.EvictionPolicy; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; - -public class DistributedMapCacheServer extends DistributedCacheServer { - - @Override - protected CacheServer createCacheServer(final ConfigurationContext context) { - final int port = context.getProperty(PORT).asInteger(); - final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue(); - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); - final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); - - final SSLContext sslContext; - if ( sslContextService == null ) { - sslContext = null; - } else { - sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); - } - - final EvictionPolicy evictionPolicy; - switch (evictionPolicyName) { - case EVICTION_STRATEGY_FIFO: - evictionPolicy = EvictionPolicy.FIFO; - break; - case EVICTION_STRATEGY_LFU: - evictionPolicy = EvictionPolicy.LFU; - break; - case EVICTION_STRATEGY_LRU: - evictionPolicy = EvictionPolicy.LRU; - break; - default: - throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); - } - - try { - final File persistenceDir = persistencePath == null ? null : new File(persistencePath); - - return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java deleted file mode 100644 index 534cb0b..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public interface MapCache { - - MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; - boolean containsKey(ByteBuffer key) throws IOException; - ByteBuffer get(ByteBuffer key) throws IOException; - ByteBuffer remove(ByteBuffer key) throws IOException; - void shutdown() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java deleted file mode 100644 index b0ab0c4..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.nio.ByteBuffer; - -import org.apache.nifi.distributed.cache.server.CacheRecord; - -public class MapCacheRecord extends CacheRecord { - private final ByteBuffer key; - private final ByteBuffer value; - - public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { - this.key = key; - this.value = value; - } - - public ByteBuffer getKey() { - return key; - } - - public ByteBuffer getValue() { - return value; - } - - @Override - public int hashCode() { - return 2938476 + key.hashCode() * value.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if ( obj == this ) { - return true; - } - - if ( obj instanceof MapCacheRecord ) { - final MapCacheRecord that = ((MapCacheRecord) obj); - return key.equals(that.key) && value.equals(that.value); - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java deleted file mode 100644 index 3e8dd0e..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.io.DataInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.distributed.cache.server.AbstractCacheServer; -import org.apache.nifi.distributed.cache.server.EvictionPolicy; -import org.apache.nifi.io.DataOutputStream; - -public class MapCacheServer extends AbstractCacheServer { - - private final MapCache cache; - - public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize, - final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException { - super(identifier, sslContext, port); - - final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy); - - if (persistencePath == null) { - this.cache = simpleCache; - } else { - final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache); - persistentCache.restore(); - this.cache = persistentCache; - } - } - - @Override - protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { - final DataInputStream dis = new DataInputStream(in); - final DataOutputStream dos = new DataOutputStream(out); - final String action = dis.readUTF(); - try { - switch (action) { - case "close": { - return false; - } - case "putIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - dos.writeBoolean(putResult.isSuccessful()); - break; - } - case "containsKey": { - final byte[] key = readValue(dis); - final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); - dos.writeBoolean(contains); - break; - } - case "getAndPutIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - if (putResult.isSuccessful()) { - // Put was successful. There was no old value to get. - dos.writeInt(0); - } else { - // we didn't put. Write back the previous value - final byte[] byteArray = putResult.getExistingValue().array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); - } - - break; - } - case "get": { - final byte[] key = readValue(dis); - final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); - if (existingValue == null) { - // there was no existing value; we did a "put". - dos.writeInt(0); - } else { - // a value already existed. we did not update the map - final byte[] byteArray = existingValue.array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); - } - - break; - } - case "remove": { - final byte[] key = readValue(dis); - final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; - dos.writeBoolean(removed); - break; - } - default: { - throw new IOException("Illegal Request"); - } - } - } finally { - dos.flush(); - } - - return true; - } - - @Override - public void stop() throws IOException { - try { - super.stop(); - } finally { - cache.shutdown(); - } - } - - @Override - protected void finalize() throws Throwable { - if (!stopped) - stop(); - } - - private byte[] readValue(final DataInputStream dis) throws IOException { - final int numBytes = dis.readInt(); - final byte[] buffer = new byte[numBytes]; - dis.readFully(buffer); - return buffer; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java deleted file mode 100644 index 29695eb..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.nio.ByteBuffer; - -public class MapPutResult { - private final boolean successful; - private final ByteBuffer key, value; - private final ByteBuffer existingValue; - private final ByteBuffer evictedKey, evictedValue; - - public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { - this.successful = successful; - this.key = key; - this.value = value; - this.existingValue = existingValue; - this.evictedKey = evictedKey; - this.evictedValue = evictedValue; - } - - public boolean isSuccessful() { - return successful; - } - - public ByteBuffer getKey() { - return key; - } - - public ByteBuffer getValue() { - return value; - } - - public ByteBuffer getExistingValue() { - return existingValue; - } - - public ByteBuffer getEvictedKey() { - return evictedKey; - } - - public ByteBuffer getEvictedValue() { - return evictedValue; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java deleted file mode 100644 index 77fb77d..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.wali.MinimalLockingWriteAheadLog; -import org.wali.SerDe; -import org.wali.UpdateType; -import org.wali.WriteAheadRepository; - -public class PersistentMapCache implements MapCache { - - private final MapCache wrapped; - private final WriteAheadRepository<MapWaliRecord> wali; - - private final AtomicLong modifications = new AtomicLong(0L); - - public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException { - wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); - wrapped = cacheToWrap; - } - - synchronized void restore() throws IOException { - final Collection<MapWaliRecord> recovered = wali.recoverRecords(); - for ( final MapWaliRecord record : recovered ) { - if ( record.getUpdateType() == UpdateType.CREATE ) { - wrapped.putIfAbsent(record.getKey(), record.getValue()); - } - } - } - - @Override - public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { - final MapPutResult putResult = wrapped.putIfAbsent(key, value); - if ( putResult.isSuccessful() ) { - // The put was successful. - final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); - final List<MapWaliRecord> records = new ArrayList<>(); - records.add(record); - - if ( putResult.getEvictedKey() != null ) { - records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); - } - - wali.update(Collections.singletonList(record), false); - - final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 100000 == 0 ) { - wali.checkpoint(); - } - } - - return putResult; - } - - @Override - public boolean containsKey(final ByteBuffer key) throws IOException { - return wrapped.containsKey(key); - } - - @Override - public ByteBuffer get(final ByteBuffer key) throws IOException { - return wrapped.get(key); - } - - @Override - public ByteBuffer remove(ByteBuffer key) throws IOException { - final ByteBuffer removeResult = wrapped.remove(key); - if ( removeResult != null ) { - final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); - final List<MapWaliRecord> records = new ArrayList<>(1); - records.add(record); - wali.update(records, false); - - final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { - wali.checkpoint(); - } - } - return removeResult; - } - - - @Override - public void shutdown() throws IOException { - wali.shutdown(); - } - - - private static class MapWaliRecord { - private final UpdateType updateType; - private final ByteBuffer key; - private final ByteBuffer value; - - public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) { - this.updateType = updateType; - this.key = key; - this.value = value; - } - - public UpdateType getUpdateType() { - return updateType; - } - - public ByteBuffer getKey() { - return key; - } - - public ByteBuffer getValue() { - return value; - } - } - - private static class Serde implements SerDe<MapWaliRecord> { - - @Override - public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { - final UpdateType updateType = newRecordState.getUpdateType(); - if ( updateType == UpdateType.DELETE ) { - out.write(0); - } else { - out.write(1); - } - - final byte[] key = newRecordState.getKey().array(); - final byte[] value = newRecordState.getValue().array(); - - out.writeInt(key.length); - out.write(key); - out.writeInt(value.length); - out.write(value); - } - - @Override - public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException { - serializeEdit(null, record, out); - } - - @Override - public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException { - final int updateTypeValue = in.read(); - if ( updateTypeValue < 0 ) { - throw new EOFException(); - } - - final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE); - - final int keySize = in.readInt(); - final byte[] key = new byte[keySize]; - in.readFully(key); - - final int valueSize = in.readInt(); - final byte[] value = new byte[valueSize]; - in.readFully(value); - - return new MapWaliRecord(updateType, ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - } - - @Override - public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException { - return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version); - } - - @Override - public Object getRecordIdentifier(final MapWaliRecord record) { - return record.getKey(); - } - - @Override - public UpdateType getUpdateType(final MapWaliRecord record) { - return record.getUpdateType(); - } - - @Override - public String getLocation(final MapWaliRecord record) { - return null; - } - - @Override - public int getVersion() { - return 1; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java deleted file mode 100644 index 10139f1..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.map; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.distributed.cache.server.EvictionPolicy; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SimpleMapCache implements MapCache { - private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class); - - private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>(); - private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap; - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - private final String serviceIdentifier; - - private final int maxSize; - - public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { - // need to change to ConcurrentMap as this is modified when only the readLock is held - inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator()); - this.serviceIdentifier = serviceIdentifier; - this.maxSize = maxSize; - } - - @Override - public String toString() { - return "SimpleSetCache[service id=" + serviceIdentifier + "]"; - } - - // don't need synchronized because this method is only called when the writeLock is held, and all - // public methods obtain either the read or write lock - private MapCacheRecord evict() { - if ( cache.size() < maxSize ) { - return null; - } - - final MapCacheRecord recordToEvict = inverseCacheMap.firstKey(); - final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); - cache.remove(valueToEvict); - - if ( logger.isDebugEnabled() ) { - logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); - } - - return recordToEvict; - } - - @Override - public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) { - writeLock.lock(); - try { - final MapCacheRecord record = cache.get(key); - if ( record == null ) { - // Record is null. We will add. - final MapCacheRecord evicted = evict(); - final MapCacheRecord newRecord = new MapCacheRecord(key, value); - cache.put(key, newRecord); - inverseCacheMap.put(newRecord, key); - - if ( evicted == null ) { - return new MapPutResult(true, key, value, null, null, null); - } else { - return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue()); - } - } - - // Record is not null. Increment hit count and return result indicating that record was not added. - inverseCacheMap.remove(record); - record.hit(); - inverseCacheMap.put(record, key); - - return new MapPutResult(false, key, value, record.getValue(), null, null); - } finally { - writeLock.unlock(); - } - } - - @Override - public boolean containsKey(final ByteBuffer key) { - readLock.lock(); - try { - final MapCacheRecord record = cache.get(key); - if ( record == null ) { - return false; - } - - inverseCacheMap.remove(record); - record.hit(); - inverseCacheMap.put(record, key); - - return true; - } finally { - readLock.unlock(); - } - } - - @Override - public ByteBuffer get(final ByteBuffer key) { - readLock.lock(); - try { - final MapCacheRecord record = cache.get(key); - if ( record == null ) { - return null; - } - - inverseCacheMap.remove(record); - record.hit(); - inverseCacheMap.put(record, key); - - return record.getValue(); - } finally { - readLock.unlock(); - } - } - - @Override - public ByteBuffer remove(ByteBuffer key) throws IOException { - writeLock.lock(); - try { - final MapCacheRecord record = cache.remove(key); - if (record == null) { - return null; - } - inverseCacheMap.remove(record); - return record.getValue(); - } finally { - writeLock.unlock(); - } - } - - @Override - public void shutdown() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java deleted file mode 100644 index 4d75fc0..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.set; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.wali.MinimalLockingWriteAheadLog; -import org.wali.SerDe; -import org.wali.UpdateType; -import org.wali.WriteAheadRepository; - -public class PersistentSetCache implements SetCache { - - private final SetCache wrapped; - private final WriteAheadRepository<SetRecord> wali; - - private final AtomicLong modifications = new AtomicLong(0L); - - public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException { - wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); - wrapped = cacheToWrap; - } - - public synchronized void restore() throws IOException { - final Collection<SetRecord> recovered = wali.recoverRecords(); - for ( final SetRecord record : recovered ) { - if ( record.getUpdateType() == UpdateType.CREATE ) { - addIfAbsent(record.getBuffer()); - } - } - } - - @Override - public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException { - final SetCacheResult removeResult = wrapped.remove(value); - if ( removeResult.getResult() ) { - final SetRecord record = new SetRecord(UpdateType.DELETE, value); - final List<SetRecord> records = new ArrayList<>(); - records.add(record); - wali.update(records, false); - - final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { - wali.checkpoint(); - } - } - - return removeResult; - } - - @Override - public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException { - final SetCacheResult addResult = wrapped.addIfAbsent(value); - if ( addResult.getResult() ) { - final SetRecord record = new SetRecord(UpdateType.CREATE, value); - final List<SetRecord> records = new ArrayList<>(); - records.add(record); - - final SetCacheRecord evictedRecord = addResult.getEvictedRecord(); - if ( evictedRecord != null ) { - records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue())); - } - - wali.update(records, false); - - final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { - wali.checkpoint(); - } - } - - return addResult; - } - - @Override - public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException { - return wrapped.contains(value); - } - - @Override - public void shutdown() throws IOException { - wali.shutdown(); - } - - private static class SetRecord { - private final UpdateType updateType; - private final ByteBuffer value; - - public SetRecord(final UpdateType updateType, final ByteBuffer value) { - this.updateType = updateType; - this.value = value; - } - - public UpdateType getUpdateType() { - return updateType; - } - - public ByteBuffer getBuffer() { - return value; - } - - public byte[] getData() { - return value.array(); - } - } - - private static class Serde implements SerDe<SetRecord> { - - @Override - public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException { - final UpdateType updateType = newRecordState.getUpdateType(); - if ( updateType == UpdateType.DELETE ) { - out.write(0); - } else { - out.write(1); - } - - final byte[] data = newRecordState.getData(); - out.writeInt(data.length); - out.write(newRecordState.getData()); - } - - @Override - public void serializeRecord(SetRecord record, DataOutputStream out) throws IOException { - serializeEdit(null, record, out); - } - - @Override - public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException { - final int value = in.read(); - if ( value < 0 ) { - throw new EOFException(); - } - - final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE); - - final int size = in.readInt(); - final byte[] data = new byte[size]; - in.readFully(data); - - return new SetRecord(updateType, ByteBuffer.wrap(data)); - } - - @Override - public SetRecord deserializeRecord(DataInputStream in, int version) throws IOException { - return deserializeEdit(in, new HashMap<Object, SetRecord>(), version); - } - - @Override - public Object getRecordIdentifier(final SetRecord record) { - return record.getBuffer(); - } - - @Override - public UpdateType getUpdateType(final SetRecord record) { - return record.getUpdateType(); - } - - @Override - public String getLocation(final SetRecord record) { - return null; - } - - @Override - public int getVersion() { - return 1; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java deleted file mode 100644 index bf6ae3e..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.set; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public interface SetCache { - - SetCacheResult remove(ByteBuffer value) throws IOException; - SetCacheResult addIfAbsent(ByteBuffer value) throws IOException; - SetCacheResult contains(ByteBuffer value) throws IOException; - void shutdown() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java deleted file mode 100644 index 20b6fae..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.set; - -import java.nio.ByteBuffer; - -import org.apache.nifi.distributed.cache.server.CacheRecord; - -public class SetCacheRecord extends CacheRecord { - private final ByteBuffer value; - - public SetCacheRecord(final ByteBuffer value) { - this.value = value; - } - - public ByteBuffer getValue() { - return value; - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if ( this == obj ) { - return true; - } - - if (obj instanceof SetCacheRecord) { - return value.equals(((SetCacheRecord) obj).value); - } - return false; - } - - @Override - public String toString() { - return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java deleted file mode 100644 index 732c4f0..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.server.set; - - - -public class SetCacheResult { - private final boolean result; - private final SetCacheRecord stats; - private final SetCacheRecord evictedRecord; - - public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) { - this.result = result; - this.stats = stats; - this.evictedRecord = evictedRecord; - } - - public boolean getResult() { - return result; - } - - public SetCacheRecord getRecord() { - return stats; - } - - public SetCacheRecord getEvictedRecord() { - return evictedRecord; - } -}