http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java new file mode 100644 index 0000000..29695eb --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.nio.ByteBuffer; + +public class MapPutResult { + private final boolean successful; + private final ByteBuffer key, value; + private final ByteBuffer existingValue; + private final ByteBuffer evictedKey, evictedValue; + + public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { + this.successful = successful; + this.key = key; + this.value = value; + this.existingValue = existingValue; + this.evictedKey = evictedKey; + this.evictedValue = evictedValue; + } + + public boolean isSuccessful() { + return successful; + } + + public ByteBuffer getKey() { + return key; + } + + public ByteBuffer getValue() { + return value; + } + + public ByteBuffer getExistingValue() { + return existingValue; + } + + public ByteBuffer getEvictedKey() { + return evictedKey; + } + + public ByteBuffer getEvictedValue() { + return evictedValue; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java new file mode 100644 index 0000000..77fb77d --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.SerDe; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +public class PersistentMapCache implements MapCache { + + private final MapCache wrapped; + private final WriteAheadRepository<MapWaliRecord> wali; + + private final AtomicLong modifications = new AtomicLong(0L); + + public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException { + wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); + wrapped = cacheToWrap; + } + + synchronized void restore() throws IOException { + final Collection<MapWaliRecord> recovered = wali.recoverRecords(); + for ( final MapWaliRecord record : recovered ) { + if ( record.getUpdateType() == UpdateType.CREATE ) { + wrapped.putIfAbsent(record.getKey(), record.getValue()); + } + } + } + + @Override + public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { + final MapPutResult putResult = wrapped.putIfAbsent(key, value); + if ( putResult.isSuccessful() ) { + // The put was successful. + final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); + final List<MapWaliRecord> records = new ArrayList<>(); + records.add(record); + + if ( putResult.getEvictedKey() != null ) { + records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); + } + + wali.update(Collections.singletonList(record), false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 100000 == 0 ) { + wali.checkpoint(); + } + } + + return putResult; + } + + @Override + public boolean containsKey(final ByteBuffer key) throws IOException { + return wrapped.containsKey(key); + } + + @Override + public ByteBuffer get(final ByteBuffer key) throws IOException { + return wrapped.get(key); + } + + @Override + public ByteBuffer remove(ByteBuffer key) throws IOException { + final ByteBuffer removeResult = wrapped.remove(key); + if ( removeResult != null ) { + final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); + final List<MapWaliRecord> records = new ArrayList<>(1); + records.add(record); + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 1000 == 0 ) { + wali.checkpoint(); + } + } + return removeResult; + } + + + @Override + public void shutdown() throws IOException { + wali.shutdown(); + } + + + private static class MapWaliRecord { + private final UpdateType updateType; + private final ByteBuffer key; + private final ByteBuffer value; + + public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) { + this.updateType = updateType; + this.key = key; + this.value = value; + } + + public UpdateType getUpdateType() { + return updateType; + } + + public ByteBuffer getKey() { + return key; + } + + public ByteBuffer getValue() { + return value; + } + } + + private static class Serde implements SerDe<MapWaliRecord> { + + @Override + public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { + final UpdateType updateType = newRecordState.getUpdateType(); + if ( updateType == UpdateType.DELETE ) { + out.write(0); + } else { + out.write(1); + } + + final byte[] key = newRecordState.getKey().array(); + final byte[] value = newRecordState.getValue().array(); + + out.writeInt(key.length); + out.write(key); + out.writeInt(value.length); + out.write(value); + } + + @Override + public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException { + serializeEdit(null, record, out); + } + + @Override + public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException { + final int updateTypeValue = in.read(); + if ( updateTypeValue < 0 ) { + throw new EOFException(); + } + + final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE); + + final int keySize = in.readInt(); + final byte[] key = new byte[keySize]; + in.readFully(key); + + final int valueSize = in.readInt(); + final byte[] value = new byte[valueSize]; + in.readFully(value); + + return new MapWaliRecord(updateType, ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + } + + @Override + public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException { + return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version); + } + + @Override + public Object getRecordIdentifier(final MapWaliRecord record) { + return record.getKey(); + } + + @Override + public UpdateType getUpdateType(final MapWaliRecord record) { + return record.getUpdateType(); + } + + @Override + public String getLocation(final MapWaliRecord record) { + return null; + } + + @Override + public int getVersion() { + return 1; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java new file mode 100644 index 0000000..10139f1 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.distributed.cache.server.EvictionPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleMapCache implements MapCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class); + + private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>(); + private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private final String serviceIdentifier; + + private final int maxSize; + + public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { + // need to change to ConcurrentMap as this is modified when only the readLock is held + inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator()); + this.serviceIdentifier = serviceIdentifier; + this.maxSize = maxSize; + } + + @Override + public String toString() { + return "SimpleSetCache[service id=" + serviceIdentifier + "]"; + } + + // don't need synchronized because this method is only called when the writeLock is held, and all + // public methods obtain either the read or write lock + private MapCacheRecord evict() { + if ( cache.size() < maxSize ) { + return null; + } + + final MapCacheRecord recordToEvict = inverseCacheMap.firstKey(); + final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); + cache.remove(valueToEvict); + + if ( logger.isDebugEnabled() ) { + logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); + } + + return recordToEvict; + } + + @Override + public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) { + writeLock.lock(); + try { + final MapCacheRecord record = cache.get(key); + if ( record == null ) { + // Record is null. We will add. + final MapCacheRecord evicted = evict(); + final MapCacheRecord newRecord = new MapCacheRecord(key, value); + cache.put(key, newRecord); + inverseCacheMap.put(newRecord, key); + + if ( evicted == null ) { + return new MapPutResult(true, key, value, null, null, null); + } else { + return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue()); + } + } + + // Record is not null. Increment hit count and return result indicating that record was not added. + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + + return new MapPutResult(false, key, value, record.getValue(), null, null); + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean containsKey(final ByteBuffer key) { + readLock.lock(); + try { + final MapCacheRecord record = cache.get(key); + if ( record == null ) { + return false; + } + + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + + return true; + } finally { + readLock.unlock(); + } + } + + @Override + public ByteBuffer get(final ByteBuffer key) { + readLock.lock(); + try { + final MapCacheRecord record = cache.get(key); + if ( record == null ) { + return null; + } + + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + + return record.getValue(); + } finally { + readLock.unlock(); + } + } + + @Override + public ByteBuffer remove(ByteBuffer key) throws IOException { + writeLock.lock(); + try { + final MapCacheRecord record = cache.remove(key); + if (record == null) { + return null; + } + inverseCacheMap.remove(record); + return record.getValue(); + } finally { + writeLock.unlock(); + } + } + + @Override + public void shutdown() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java new file mode 100644 index 0000000..4d75fc0 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.SerDe; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +public class PersistentSetCache implements SetCache { + + private final SetCache wrapped; + private final WriteAheadRepository<SetRecord> wali; + + private final AtomicLong modifications = new AtomicLong(0L); + + public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException { + wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); + wrapped = cacheToWrap; + } + + public synchronized void restore() throws IOException { + final Collection<SetRecord> recovered = wali.recoverRecords(); + for ( final SetRecord record : recovered ) { + if ( record.getUpdateType() == UpdateType.CREATE ) { + addIfAbsent(record.getBuffer()); + } + } + } + + @Override + public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException { + final SetCacheResult removeResult = wrapped.remove(value); + if ( removeResult.getResult() ) { + final SetRecord record = new SetRecord(UpdateType.DELETE, value); + final List<SetRecord> records = new ArrayList<>(); + records.add(record); + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 1000 == 0 ) { + wali.checkpoint(); + } + } + + return removeResult; + } + + @Override + public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException { + final SetCacheResult addResult = wrapped.addIfAbsent(value); + if ( addResult.getResult() ) { + final SetRecord record = new SetRecord(UpdateType.CREATE, value); + final List<SetRecord> records = new ArrayList<>(); + records.add(record); + + final SetCacheRecord evictedRecord = addResult.getEvictedRecord(); + if ( evictedRecord != null ) { + records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue())); + } + + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 1000 == 0 ) { + wali.checkpoint(); + } + } + + return addResult; + } + + @Override + public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException { + return wrapped.contains(value); + } + + @Override + public void shutdown() throws IOException { + wali.shutdown(); + } + + private static class SetRecord { + private final UpdateType updateType; + private final ByteBuffer value; + + public SetRecord(final UpdateType updateType, final ByteBuffer value) { + this.updateType = updateType; + this.value = value; + } + + public UpdateType getUpdateType() { + return updateType; + } + + public ByteBuffer getBuffer() { + return value; + } + + public byte[] getData() { + return value.array(); + } + } + + private static class Serde implements SerDe<SetRecord> { + + @Override + public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException { + final UpdateType updateType = newRecordState.getUpdateType(); + if ( updateType == UpdateType.DELETE ) { + out.write(0); + } else { + out.write(1); + } + + final byte[] data = newRecordState.getData(); + out.writeInt(data.length); + out.write(newRecordState.getData()); + } + + @Override + public void serializeRecord(SetRecord record, DataOutputStream out) throws IOException { + serializeEdit(null, record, out); + } + + @Override + public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException { + final int value = in.read(); + if ( value < 0 ) { + throw new EOFException(); + } + + final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE); + + final int size = in.readInt(); + final byte[] data = new byte[size]; + in.readFully(data); + + return new SetRecord(updateType, ByteBuffer.wrap(data)); + } + + @Override + public SetRecord deserializeRecord(DataInputStream in, int version) throws IOException { + return deserializeEdit(in, new HashMap<Object, SetRecord>(), version); + } + + @Override + public Object getRecordIdentifier(final SetRecord record) { + return record.getBuffer(); + } + + @Override + public UpdateType getUpdateType(final SetRecord record) { + return record.getUpdateType(); + } + + @Override + public String getLocation(final SetRecord record) { + return null; + } + + @Override + public int getVersion() { + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java new file mode 100644 index 0000000..bf6ae3e --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface SetCache { + + SetCacheResult remove(ByteBuffer value) throws IOException; + SetCacheResult addIfAbsent(ByteBuffer value) throws IOException; + SetCacheResult contains(ByteBuffer value) throws IOException; + void shutdown() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java new file mode 100644 index 0000000..20b6fae --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.nio.ByteBuffer; + +import org.apache.nifi.distributed.cache.server.CacheRecord; + +public class SetCacheRecord extends CacheRecord { + private final ByteBuffer value; + + public SetCacheRecord(final ByteBuffer value) { + this.value = value; + } + + public ByteBuffer getValue() { + return value; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if ( this == obj ) { + return true; + } + + if (obj instanceof SetCacheRecord) { + return value.equals(((SetCacheRecord) obj).value); + } + return false; + } + + @Override + public String toString() { + return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java new file mode 100644 index 0000000..732c4f0 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + + + +public class SetCacheResult { + private final boolean result; + private final SetCacheRecord stats; + private final SetCacheRecord evictedRecord; + + public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) { + this.result = result; + this.stats = stats; + this.evictedRecord = evictedRecord; + } + + public boolean getResult() { + return result; + } + + public SetCacheRecord getRecord() { + return stats; + } + + public SetCacheRecord getEvictedRecord() { + return evictedRecord; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java new file mode 100644 index 0000000..77d6481 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.set; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.nifi.distributed.cache.server.EvictionPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleSetCache implements SetCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class); + + private final Map<ByteBuffer, SetCacheRecord> cache = new HashMap<>(); + private final SortedMap<SetCacheRecord, ByteBuffer> inverseCacheMap; + + private final String serviceIdentifier; + + private final int maxSize; + + public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { + inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator()); + this.serviceIdentifier = serviceIdentifier; + this.maxSize = maxSize; + } + + private synchronized SetCacheRecord evict() { + if ( cache.size() < maxSize ) { + return null; + } + + final SetCacheRecord recordToEvict = inverseCacheMap.firstKey(); + final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); + cache.remove(valueToEvict); + + if ( logger.isDebugEnabled() ) { + logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); + } + + return recordToEvict; + } + + @Override + public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) { + final SetCacheRecord record = cache.get(value); + if ( record == null ) { + final SetCacheRecord evicted = evict(); + final SetCacheRecord newRecord = new SetCacheRecord(value); + cache.put(value, newRecord); + inverseCacheMap.put(newRecord, value); + return new SetCacheResult(true, newRecord, evicted); + } else { + // We have to remove the record and add it again in order to cause the Map to stay sorted + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, value); + + return new SetCacheResult(false, record, null); + } + } + + @Override + public synchronized SetCacheResult contains(final ByteBuffer value) { + final SetCacheRecord record = cache.get(value); + if ( record == null ) { + return new SetCacheResult(false, null, null); + } else { + // We have to remove the record and add it again in order to cause the Map to stay sorted + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, value); + + return new SetCacheResult(true, record, null); + } + } + + @Override + public synchronized SetCacheResult remove(final ByteBuffer value) { + final SetCacheRecord record = cache.remove(value); + if ( record == null ) { + return new SetCacheResult(false, null, null); + } else { + inverseCacheMap.remove(record); + return new SetCacheResult(true, record, null); + } + } + + @Override + public String toString() { + return "SimpleSetCache[service id=" + serviceIdentifier + "]"; + } + + @Override + public void shutdown() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..0509c7c --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.distributed.cache.server.DistributedSetCacheServer +org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html new file mode 100644 index 0000000..dca3aa1 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html @@ -0,0 +1,82 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> +<meta charset="utf-8" /> +<title>Distributed Map Cache Client Service</title> +<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> + <h2>Description:</h2> + + <p>A Controller Service that starts an embedded server and listens for connections from clients. The + server provides the ability to query the cache, add data to the cache, and remove data from the cache.</p> + + + + <p> + <strong>Properties:</strong> + </p> + <p>In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.</p> + + <ul> + <li><strong>Port</strong> + <ul> + <li>The port to listen on for incoming connections</li> + <li>Default value: 4557</li> + <li>Supports expression language: false</li> + </ul></li> + <li>SSL Context Service + <ul> + <li>If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure</li> + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Maximum Cache Entries</strong> + <ul> + <li>The maximum number of cache entries that the cache can hold + <li>Default value: 10,000</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Eviction Strategy</strong> + <ul> + <li>Determines which strategy should be used to evict values from the cache to make room for new entries. Valid values: + <code>Least Frequently Used</code>, <code>Least Recently Used</code>, and <code>First In, First Out</code> + <li>Default value: Least Frequently Used</li> + <li>Supports expression language: false</li> + </ul></li> + <li>Persistence Directory + <ul> + <li>If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only</li> + <li>Default value: no default (in-memory)</li> + <li>Supports expression language: true - JVM and System Properties Only</li> + </ul></li> + </ul> + + + <i>See Also:</i> + <ul> + <li><a href="../org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html">Distributed Map Cache Client Service</a></li> + <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li> + </ul> + +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java new file mode 100644 index 0000000..b5f3fd6 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.ConnectException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockControllerServiceInitializationContext; + +import org.apache.commons.lang3.SerializationException; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestServerAndClient { + + private static Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.AbstractCacheServer", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.client.DistributedMapCacheClientService", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.TestServerAndClient", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.remote.io.socket.ssl.SSLSocketChannel", "trace"); + LOGGER = LoggerFactory.getLogger(TestServerAndClient.class); + } + + @Test + public void testNonPersistentSetServerAndClient() throws InitializationException, IOException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final DistributedSetCacheServer server = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + final DistributedSetCacheClientService client = createClient(); + final Serializer<String> serializer = new StringSerializer(); + final boolean added = client.addIfAbsent("test", serializer); + assertTrue(added); + + final boolean contains = client.contains("test", serializer); + assertTrue(contains); + + final boolean addedAgain = client.addIfAbsent("test", serializer); + assertFalse(addedAgain); + + final boolean removed = client.remove("test", serializer); + assertTrue(removed); + + final boolean containedAfterRemove = client.contains("test", serializer); + assertFalse(containedAfterRemove); + + server.shutdownServer(); + } + + @Test + public void testPersistentSetServerAndClient() throws InitializationException, IOException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final DistributedSetCacheServer server = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final File dataFile = new File("target/cache-data"); + deleteRecursively(dataFile); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + final DistributedSetCacheClientService client = createClient(); + final Serializer<String> serializer = new StringSerializer(); + final boolean added = client.addIfAbsent("test", serializer); + final boolean added2 = client.addIfAbsent("test2", serializer); + assertTrue(added); + assertTrue(added2); + + final boolean contains = client.contains("test", serializer); + final boolean contains2 = client.contains("test2", serializer); + assertTrue(contains); + assertTrue(contains2); + + final boolean addedAgain = client.addIfAbsent("test", serializer); + assertFalse(addedAgain); + + final boolean removed = client.remove("test", serializer); + assertTrue(removed); + + final boolean containedAfterRemove = client.contains("test", serializer); + assertFalse(containedAfterRemove); + + server.shutdownServer(); + + final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); + newServer.initialize(newServerInitContext); + + final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, + newServerInitContext.getControllerServiceLookup()); + newServer.startServer(newServerContext); + + assertFalse(client.contains("test", serializer)); + assertTrue(client.contains("test2", serializer)); + + newServer.shutdownServer(); + } + + @Test + public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final DistributedSetCacheServer server = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final File dataFile = new File("target/cache-data"); + deleteRecursively(dataFile); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); + + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + final DistributedSetCacheClientService client = createClient(); + final Serializer<String> serializer = new StringSerializer(); + final boolean added = client.addIfAbsent("test", serializer); + waitABit(); + final boolean added2 = client.addIfAbsent("test2", serializer); + waitABit(); + final boolean added3 = client.addIfAbsent("test3", serializer); + waitABit(); + assertTrue(added); + assertTrue(added2); + assertTrue(added3); + + final boolean contains = client.contains("test", serializer); + final boolean contains2 = client.contains("test2", serializer); + assertTrue(contains); + assertTrue(contains2); + + final boolean addedAgain = client.addIfAbsent("test", serializer); + assertFalse(addedAgain); + + final boolean added4 = client.addIfAbsent("test4", serializer); + assertTrue(added4); + + // ensure that added3 was evicted because it was used least frequently + assertFalse(client.contains("test3", serializer)); + + server.shutdownServer(); + + final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); + newServer.initialize(newServerInitContext); + + final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, + newServerInitContext.getControllerServiceLookup()); + newServer.startServer(newServerContext); + + assertTrue(client.contains("test", serializer)); + assertTrue(client.contains("test2", serializer)); + assertFalse(client.contains("test3", serializer)); + assertTrue(client.contains("test4", serializer)); + + newServer.shutdownServer(); + } + + @Test + public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final DistributedSetCacheServer server = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final File dataFile = new File("target/cache-data"); + deleteRecursively(dataFile); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); + serverProperties.put(DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); + + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + final DistributedSetCacheClientService client = createClient(); + final Serializer<String> serializer = new StringSerializer(); + + // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond + // for the entry time so we don't know which entry will be evicted. So we wait a few millis in between + final boolean added = client.addIfAbsent("test", serializer); + waitABit(); + final boolean added2 = client.addIfAbsent("test2", serializer); + waitABit(); + final boolean added3 = client.addIfAbsent("test3", serializer); + waitABit(); + + assertTrue(added); + assertTrue(added2); + assertTrue(added3); + + final boolean contains = client.contains("test", serializer); + final boolean contains2 = client.contains("test2", serializer); + assertTrue(contains); + assertTrue(contains2); + + final boolean addedAgain = client.addIfAbsent("test", serializer); + assertFalse(addedAgain); + + final boolean added4 = client.addIfAbsent("test4", serializer); + assertTrue(added4); + + // ensure that added3 was evicted because it was used least frequently + assertFalse(client.contains("test", serializer)); + assertTrue(client.contains("test3", serializer)); + + server.shutdownServer(); + + final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); + MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); + newServer.initialize(newServerInitContext); + + final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, + newServerInitContext.getControllerServiceLookup()); + newServer.startServer(newServerContext); + + assertFalse(client.contains("test", serializer)); + assertTrue(client.contains("test2", serializer)); + assertTrue(client.contains("test3", serializer)); + assertTrue(client.contains("test4", serializer)); + + newServer.shutdownServer(); + } + + @Test + public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final DistributedMapCacheServer server = new DistributedMapCacheServer(); + MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + client.initialize(clientInitContext); + + final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); + MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); + client.cacheConfig(clientContext); + final Serializer<String> valueSerializer = new StringSerializer(); + final Serializer<String> keySerializer = new StringSerializer(); + final Deserializer<String> deserializer = new StringDeserializer(); + + final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); + assertEquals(null, original); + LOGGER.debug("end getAndPutIfAbsent"); + + final boolean contains = client.containsKey("testKey", keySerializer); + assertTrue(contains); + LOGGER.debug("end containsKey"); + + final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); + assertFalse(added); + LOGGER.debug("end putIfAbsent"); + + final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); + assertEquals("test", originalAfterPut); + LOGGER.debug("end getAndPutIfAbsent"); + + final boolean removed = client.remove("testKey", keySerializer); + assertTrue(removed); + LOGGER.debug("end remove"); + + final boolean containedAfterRemove = client.containsKey("testKey", keySerializer); + assertFalse(containedAfterRemove); + + client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); + client.close(); + try { + client.containsKey("testKey", keySerializer); + fail("Should be closed and not accessible"); + } catch (Exception e) { + + } + client = null; + clientInitContext = null; + clientContext = null; + + DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); + + MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2"); + client2.initialize(clientInitContext2); + + MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, + clientInitContext2.getControllerServiceLookup()); + client2.cacheConfig(clientContext2); + assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer)); + assertTrue(client2.containsKey("testKey", keySerializer)); + server.shutdownServer(); + Thread.sleep(1000); + try { + client2.containsKey("testKey", keySerializer); + fail("Should have blown exception!"); + } catch (ConnectException e) { + client2 = null; + clientContext2 = null; + clientInitContext2 = null; + } + Thread.sleep(2000); + System.gc(); + LOGGER.debug("end testNonPersistentMapServerAndClient"); + } + + @Test + public void testClientTermination() throws InitializationException, IOException, InterruptedException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final DistributedMapCacheServer server = new DistributedMapCacheServer(); + MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + client.initialize(clientInitContext); + + final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); + MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); + client.cacheConfig(clientContext); + final Serializer<String> valueSerializer = new StringSerializer(); + final Serializer<String> keySerializer = new StringSerializer(); + final Deserializer<String> deserializer = new StringDeserializer(); + + final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); + assertEquals(null, original); + + final boolean contains = client.containsKey("testKey", keySerializer); + assertTrue(contains); + + final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); + assertFalse(added); + + final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); + assertEquals("test", originalAfterPut); + + final boolean removed = client.remove("testKey", keySerializer); + assertTrue(removed); + + final boolean containedAfterRemove = client.containsKey("testKey", keySerializer); + assertFalse(containedAfterRemove); + + client = null; + clientInitContext = null; + clientContext = null; + Thread.sleep(2000); + System.gc(); + server.shutdownServer(); + } + + @Ignore + @Test + public void testSSLWith2RequestsWithServerTimeout() throws InitializationException, IOException, InterruptedException { + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create SSLContext Service + final StandardSSLContextService sslService = new StandardSSLContextService(); + final MockControllerServiceInitializationContext sslServerInitContext = new MockControllerServiceInitializationContext(sslService, + "ssl-context"); + sslService.initialize(sslServerInitContext); + + final Map<PropertyDescriptor, String> sslServerProps = new HashMap<>(); + sslServerProps.put(StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + sslServerProps.put(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + sslServerProps.put(StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + sslServerProps.put(StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + sslServerProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + sslServerProps.put(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + MockConfigurationContext sslServerContext = new MockConfigurationContext(sslServerProps, sslServerInitContext); + sslService.onConfigured(sslServerContext); + sslService.createSSLContext(ClientAuth.REQUIRED); + // Create server + final DistributedMapCacheServer server = new DistributedMapCacheServer(); + final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + server.initialize(serverInitContext); + + final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); + serverProperties.put(DistributedMapCacheServer.SSL_CONTEXT_SERVICE, "ssl-context"); + final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); + server.startServer(serverContext); + + DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + client.initialize(clientInitContext); + + final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); + clientProperties.put(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE, "ssl-context"); + MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); + client.cacheConfig(clientContext); + final Serializer<String> valueSerializer = new StringSerializer(); + final Serializer<String> keySerializer = new StringSerializer(); + final Deserializer<String> deserializer = new StringDeserializer(); + + final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); + assertEquals(null, original); + + Thread.sleep(30000); + try { + final boolean contains = client.containsKey("testKey", keySerializer); + assertTrue(contains); + } catch (IOException e) { + // this is due to the server timing out in the middle of this request + assertTrue(e.getMessage().contains("Channel is closed")); + } + + server.shutdownServer(); + } + + private void waitABit() { + try { + Thread.sleep(10L); + } catch (final InterruptedException e) { + } + } + + private DistributedSetCacheClientService createClient() throws InitializationException { + final DistributedSetCacheClientService client = new DistributedSetCacheClientService(); + MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + client.initialize(clientInitContext); + + final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost"); + final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); + client.onConfigured(clientContext); + + return client; + } + + private static class StringSerializer implements Serializer<String> { + @Override + public void serialize(final String value, final OutputStream output) throws SerializationException, IOException { + output.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + + private static class StringDeserializer implements Deserializer<String> { + @Override + public String deserialize(final byte[] input) throws DeserializationException, IOException { + return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8); + } + } + + private static void deleteRecursively(final File dataFile) throws IOException { + if (dataFile == null || !dataFile.exists()) { + return; + } + + final File[] children = dataFile.listFiles(); + for (final File child : children) { + if (child.isDirectory()) { + deleteRecursively(child); + } else { + for (int i = 0; i < 100 && child.exists(); i++) { + child.delete(); + } + + if (child.exists()) { + throw new IOException("Could not delete " + dataFile.getAbsolutePath()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ks.jks ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ks.jks b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ks.jks new file mode 100755 index 0000000..81be31d Binary files /dev/null and b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ks.jks differ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ts.jks ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ts.jks b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ts.jks new file mode 100755 index 0000000..820e1e1 Binary files /dev/null and b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/test/resources/localhost-ts.jks differ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-services-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-services-nar/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-services-nar/pom.xml new file mode 100644 index 0000000..75cab34 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-services-nar/pom.xml @@ -0,0 +1,49 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-services-nar</artifactId> + <name>Distributed Cache Services NAR</name> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-protocol</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-server</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/pom.xml b/nar-bundles/distributed-cache-services-bundle/pom.xml new file mode 100644 index 0000000..dcfa541 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/pom.xml @@ -0,0 +1,83 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>standard-services-api-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>Distributed Cache Services Bundle</name> + <packaging>pom</packaging> + + <modules> + <module>distributed-cache-protocol</module> + <module>distributed-cache-client-service</module> + <module>distributed-cache-server</module> + <module>distributed-cache-services-nar</module> + </modules> + + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service-api</artifactId> + <version>${standard.services.api.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service-api</artifactId> + <version>${standard.services.api.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>0.0.1-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>wali</groupId> + <artifactId>wali</artifactId> + <version>3.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service</artifactId> + <version>0.0.1-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> + </dependencyManagement> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/pom.xml b/nar-bundles/execute-script-bundle/execute-script-processors/pom.xml new file mode 100644 index 0000000..a5d3d11 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/pom.xml @@ -0,0 +1,81 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>execute-script-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>execute-script-processors</artifactId> + <description>NiFi Processors to Run Scripts</description> + <name>NiFi Script Execution Processors</name> + <dependencies> + + <dependency> + <groupId>org.jruby</groupId> + <artifactId>jruby</artifactId> + <exclusions> + <exclusion> + <artifactId>jnr-netdb</artifactId> + <groupId>com.github.jnr</groupId> + </exclusion> + <exclusion> + <artifactId>jnr-posix</artifactId> + <groupId>com.github.jnr</groupId> + </exclusion> + <exclusion> + <artifactId>jffi</artifactId> + <groupId>com.github.jnr</groupId> + </exclusion> + <exclusion> + <artifactId>nailgun-server</artifactId> + <groupId>com.martiansoftware</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.python</groupId> + <artifactId>jython-standalone</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-core-flowfile-attributes</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + </dependency> + </dependencies> +</project> + +