This is an automated email from the ASF dual-hosted git repository. jlewandowski pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 5ffe3d50 CEP-15: Add Accord configuration stub 5ffe3d50 is described below commit 5ffe3d504bb5aa1ff1c2b96d817791e40f7ced0f Author: Jacek Lewandowski <lewandowski.ja...@gmail.com> AuthorDate: Mon Oct 9 14:56:57 2023 +0200 CEP-15: Add Accord configuration stub Patch by Jacek Lewandowski; reviewed by David Capwell for CASSANDRA-18221 --- .../src/main/java/accord/config/LocalConfig.java | 29 ++++++++++++++++ .../java/accord/config/MutableLocalConfig.java | 40 ++++++++++++++++++++++ .../main/java/accord/impl/SimpleProgressLog.java | 4 ++- accord-core/src/main/java/accord/local/Node.java | 10 +++++- accord-core/src/test/java/accord/Utils.java | 6 +++- .../src/test/java/accord/impl/basic/Cluster.java | 5 ++- .../test/java/accord/impl/mock/MockCluster.java | 6 +++- .../java/accord/local/ImmutableCommandTest.java | 5 ++- .../src/main/java/accord/maelstrom/Cluster.java | 5 ++- .../src/main/java/accord/maelstrom/Main.java | 5 ++- 10 files changed, 107 insertions(+), 8 deletions(-) diff --git a/accord-core/src/main/java/accord/config/LocalConfig.java b/accord-core/src/main/java/accord/config/LocalConfig.java new file mode 100644 index 00000000..0ced3b98 --- /dev/null +++ b/accord-core/src/main/java/accord/config/LocalConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.config; + +import java.time.Duration; + +public interface LocalConfig +{ + default Duration getProgressLogScheduleDelay() + { + return Duration.ofSeconds(1); + } +} diff --git a/accord-core/src/main/java/accord/config/MutableLocalConfig.java b/accord-core/src/main/java/accord/config/MutableLocalConfig.java new file mode 100644 index 00000000..f3c42782 --- /dev/null +++ b/accord-core/src/main/java/accord/config/MutableLocalConfig.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.config; + +import java.time.Duration; + +import com.google.common.annotations.VisibleForTesting; + +@VisibleForTesting +public class MutableLocalConfig implements LocalConfig +{ + private volatile Duration progressLogScheduleDelay = LocalConfig.super.getProgressLogScheduleDelay(); + + public void setProgressLogScheduleDelay(Duration progressLogScheduleDelay) + { + this.progressLogScheduleDelay = progressLogScheduleDelay; + } + + @Override + public Duration getProgressLogScheduleDelay() + { + return progressLogScheduleDelay; + } +} diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java index 89291a1e..420d54db 100644 --- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java +++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java @@ -18,6 +18,7 @@ package accord.impl; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -679,7 +680,8 @@ public class SimpleProgressLog implements ProgressLog.Factory return; isScheduled = true; - node.scheduler().once(() -> commandStore.execute(empty(), ignore -> run()).begin(commandStore.agent()), 1L, TimeUnit.SECONDS); + Duration delay = node.localConfig().getProgressLogScheduleDelay(); + node.scheduler().once(() -> commandStore.execute(empty(), ignore -> run()).begin(commandStore.agent()), delay.toNanos(), TimeUnit.NANOSECONDS); } @Override diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 8e4237f1..f0ee2bd1 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -49,6 +49,7 @@ import accord.api.Result; import accord.api.RoutingKey; import accord.api.Scheduler; import accord.api.TopologySorter; +import accord.config.LocalConfig; import accord.coordinate.CoordinateTransaction; import accord.coordinate.MaybeRecover; import accord.coordinate.Outcome; @@ -145,6 +146,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService private final AtomicReference<Timestamp> now; private final Agent agent; private final RandomSource random; + private final LocalConfig localConfig; // TODO (expected, consider): this really needs to be thought through some more, as it needs to be per-instance in some cases, and per-node in others private final Scheduler scheduler; @@ -155,9 +157,10 @@ public class Node implements ConfigurationService.Listener, NodeTimeService public Node(Id id, MessageSink messageSink, LocalMessage.Handler localMessageHandler, ConfigurationService configService, LongSupplier nowSupplier, ToLongFunction<TimeUnit> nowTimeUnit, Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter, - Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory) + Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory, LocalConfig localConfig) { this.id = id; + this.localConfig = localConfig; this.messageSink = messageSink; this.localMessageHandler = localMessageHandler; this.configService = configService; @@ -173,6 +176,11 @@ public class Node implements ConfigurationService.Listener, NodeTimeService configService.registerListener(this); } + public LocalConfig localConfig() + { + return localConfig; + } + /** * This starts the node for tests and makes sure that the provided topology is acknowledged correctly. This method is not * safe for production systems as it doesn't handle restarts and partially acknowledged histories diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index 77a8bd81..6d42388c 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import accord.api.MessageSink; import accord.api.Scheduler; +import accord.config.LocalConfig; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; import accord.impl.SimpleProgressLog; @@ -38,6 +39,7 @@ import accord.impl.mock.MockStore; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.config.MutableLocalConfig; import accord.messages.LocalMessage; import accord.primitives.Keys; import accord.primitives.Range; @@ -140,6 +142,7 @@ public class Utils { MockStore store = new MockStore(); Scheduler scheduler = new ThreadPoolScheduler(); + LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(nodeId, messageSink, LocalMessage::process, @@ -153,7 +156,8 @@ public class Utils scheduler, SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, - InMemoryCommandStores.Synchronized::new); + InMemoryCommandStores.Synchronized::new, + localConfig); awaitUninterruptibly(node.unsafeStart()); return node; } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index a7eff1cd..efeadfec 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -35,6 +35,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import accord.config.LocalConfig; import accord.impl.MessageListener; import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; @@ -55,6 +56,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.config.MutableLocalConfig; import accord.messages.LocalMessage; import accord.messages.MessageType; import accord.messages.Message; @@ -229,11 +231,12 @@ public class Cluster implements Scheduler MessageSink messageSink = sinks.create(id, randomSupplier.get()); LongSupplier nowSupplier = nowSupplierSupplier.get(); BurnTestConfigurationService configService = new BurnTestConfigurationService(id, executor, randomSupplier, topology, lookup::get, topologyUpdates); + LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(id, messageSink, LocalMessage::process, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier), () -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()), executor.agent(), randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending)); + SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending), localConfig); lookup.put(id, node); CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node); // TODO (desired): randomise diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index fdd8a91b..4b3d198d 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import accord.NetworkFilter; import accord.api.MessageSink; +import accord.config.LocalConfig; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; import accord.impl.SimpleProgressLog; @@ -45,6 +46,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.config.MutableLocalConfig; import accord.messages.Callback; import accord.messages.LocalMessage; import accord.messages.Reply; @@ -119,6 +121,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> MockStore store = new MockStore(); MessageSink messageSink = messageSinkFactory.apply(id, this); MockConfigurationService configurationService = new MockConfigurationService(messageSink, onFetchTopology, topology); + LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(id, messageSink, LocalMessage::process, @@ -132,7 +135,8 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> new ThreadPoolScheduler(), SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, - InMemoryCommandStores.SingleThread::new); + InMemoryCommandStores.SingleThread::new, + localConfig); awaitUninterruptibly(node.unsafeStart()); node.onTopologyUpdate(topology, true); return node; diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index 15061ea5..ef573d11 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -32,6 +32,7 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.api.Scheduler; import accord.api.TestableConfigurationService; +import accord.config.LocalConfig; import accord.impl.InMemoryCommandStore; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; @@ -43,6 +44,7 @@ import accord.impl.mock.MockConfigurationService; import accord.impl.mock.MockStore; import accord.local.Node.Id; import accord.local.SaveStatus.LocalExecution; +import accord.config.MutableLocalConfig; import accord.primitives.FullKeyRoute; import accord.primitives.Keys; import accord.primitives.Participants; @@ -109,10 +111,11 @@ public class ImmutableCommandTest private static Node createNode(Id id, CommandStoreSupport storeSupport) { MockCluster.Clock clock = new MockCluster.Clock(100); + LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(id, null, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()), clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED, - SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new); + SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, localConfig); awaitUninterruptibly(node.unsafeStart()); node.onTopologyUpdate(storeSupport.local.get(), true); return node; diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 1af7d4c2..0184765d 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -41,6 +41,8 @@ import java.util.function.Supplier; import accord.api.MessageSink; import accord.api.Scheduler; +import accord.config.LocalConfig; +import accord.config.MutableLocalConfig; import accord.impl.InMemoryCommandStores; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; @@ -314,12 +316,13 @@ public class Cluster implements Scheduler { MessageSink messageSink = sinks.create(node, randomSupplier.get()); LongSupplier nowSupplier = nowSupplierSupplier.get(); + LocalConfig localConfig = new MutableLocalConfig(); lookup.put(node, new Node(node, messageSink, LocalMessage::process, new SimpleConfigService(topology), nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new)); + SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, localConfig)); } AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult(); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 871123a3..40e73ff2 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -32,6 +32,8 @@ import java.util.function.Supplier; import accord.api.MessageSink; import accord.api.Scheduler; +import accord.config.LocalConfig; +import accord.config.MutableLocalConfig; import accord.coordinate.Timeout; import accord.impl.InMemoryCommandStores; import accord.impl.SimpleProgressLog; @@ -173,11 +175,12 @@ public class Main MaelstromInit init = (MaelstromInit) packet.body; topology = topologyFactory.toTopology(init.cluster); sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err); + LocalConfig localConfig = new MutableLocalConfig(); on = new Node(init.self, sink, LocalMessage::process, new SimpleConfigService(topology), System::currentTimeMillis, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new); + SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, localConfig); awaitUninterruptibly(on.unsafeStart()); err.println("Initialized node " + init.self); err.flush(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org