This is an automated email from the ASF dual-hosted git repository.

jlewandowski pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5ffe3d50 CEP-15: Add Accord configuration stub
5ffe3d50 is described below

commit 5ffe3d504bb5aa1ff1c2b96d817791e40f7ced0f
Author: Jacek Lewandowski <lewandowski.ja...@gmail.com>
AuthorDate: Mon Oct 9 14:56:57 2023 +0200

    CEP-15: Add Accord configuration stub
    
    Patch by Jacek Lewandowski; reviewed by David Capwell for CASSANDRA-18221
---
 .../src/main/java/accord/config/LocalConfig.java   | 29 ++++++++++++++++
 .../java/accord/config/MutableLocalConfig.java     | 40 ++++++++++++++++++++++
 .../main/java/accord/impl/SimpleProgressLog.java   |  4 ++-
 accord-core/src/main/java/accord/local/Node.java   | 10 +++++-
 accord-core/src/test/java/accord/Utils.java        |  6 +++-
 .../src/test/java/accord/impl/basic/Cluster.java   |  5 ++-
 .../test/java/accord/impl/mock/MockCluster.java    |  6 +++-
 .../java/accord/local/ImmutableCommandTest.java    |  5 ++-
 .../src/main/java/accord/maelstrom/Cluster.java    |  5 ++-
 .../src/main/java/accord/maelstrom/Main.java       |  5 ++-
 10 files changed, 107 insertions(+), 8 deletions(-)

diff --git a/accord-core/src/main/java/accord/config/LocalConfig.java 
b/accord-core/src/main/java/accord/config/LocalConfig.java
new file mode 100644
index 00000000..0ced3b98
--- /dev/null
+++ b/accord-core/src/main/java/accord/config/LocalConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.config;
+
+import java.time.Duration;
+
+public interface LocalConfig
+{
+    default Duration getProgressLogScheduleDelay()
+    {
+        return Duration.ofSeconds(1);
+    }
+}
diff --git a/accord-core/src/main/java/accord/config/MutableLocalConfig.java 
b/accord-core/src/main/java/accord/config/MutableLocalConfig.java
new file mode 100644
index 00000000..f3c42782
--- /dev/null
+++ b/accord-core/src/main/java/accord/config/MutableLocalConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.config;
+
+import java.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
+public class MutableLocalConfig implements LocalConfig
+{
+    private volatile Duration progressLogScheduleDelay = 
LocalConfig.super.getProgressLogScheduleDelay();
+
+    public void setProgressLogScheduleDelay(Duration progressLogScheduleDelay)
+    {
+        this.progressLogScheduleDelay = progressLogScheduleDelay;
+    }
+
+    @Override
+    public Duration getProgressLogScheduleDelay()
+    {
+        return progressLogScheduleDelay;
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java 
b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index 89291a1e..420d54db 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -18,6 +18,7 @@
 
 package accord.impl;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -679,7 +680,8 @@ public class SimpleProgressLog implements 
ProgressLog.Factory
                 return;
 
             isScheduled = true;
-            node.scheduler().once(() -> commandStore.execute(empty(), ignore 
-> run()).begin(commandStore.agent()), 1L, TimeUnit.SECONDS);
+            Duration delay = node.localConfig().getProgressLogScheduleDelay();
+            node.scheduler().once(() -> commandStore.execute(empty(), ignore 
-> run()).begin(commandStore.agent()), delay.toNanos(), TimeUnit.NANOSECONDS);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 8e4237f1..f0ee2bd1 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -49,6 +49,7 @@ import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TopologySorter;
+import accord.config.LocalConfig;
 import accord.coordinate.CoordinateTransaction;
 import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
@@ -145,6 +146,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
     private final AtomicReference<Timestamp> now;
     private final Agent agent;
     private final RandomSource random;
+    private final LocalConfig localConfig;
 
     // TODO (expected, consider): this really needs to be thought through some 
more, as it needs to be per-instance in some cases, and per-node in others
     private final Scheduler scheduler;
@@ -155,9 +157,10 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
     public Node(Id id, MessageSink messageSink, LocalMessage.Handler 
localMessageHandler,
                 ConfigurationService configService, LongSupplier nowSupplier, 
ToLongFunction<TimeUnit> nowTimeUnit,
                 Supplier<DataStore> dataSupplier, ShardDistributor 
shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, 
TopologySorter.Supplier topologySorter,
-                Function<Node, ProgressLog.Factory> progressLogFactory, 
CommandStores.Factory factory)
+                Function<Node, ProgressLog.Factory> progressLogFactory, 
CommandStores.Factory factory, LocalConfig localConfig)
     {
         this.id = id;
+        this.localConfig = localConfig;
         this.messageSink = messageSink;
         this.localMessageHandler = localMessageHandler;
         this.configService = configService;
@@ -173,6 +176,11 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         configService.registerListener(this);
     }
 
+    public LocalConfig localConfig()
+    {
+        return localConfig;
+    }
+
     /**
      * This starts the node for tests and makes sure that the provided 
topology is acknowledged correctly.  This method is not
      * safe for production systems as it doesn't handle restarts and partially 
acknowledged histories
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 77a8bd81..6d42388c 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import accord.api.MessageSink;
 import accord.api.Scheduler;
+import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
 import accord.impl.SimpleProgressLog;
@@ -38,6 +39,7 @@ import accord.impl.mock.MockStore;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
 import accord.messages.LocalMessage;
 import accord.primitives.Keys;
 import accord.primitives.Range;
@@ -140,6 +142,7 @@ public class Utils
     {
         MockStore store = new MockStore();
         Scheduler scheduler = new ThreadPoolScheduler();
+        LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(nodeId,
                              messageSink,
                              LocalMessage::process,
@@ -153,7 +156,8 @@ public class Utils
                              scheduler,
                              SizeOfIntersectionSorter.SUPPLIER,
                              SimpleProgressLog::new,
-                             InMemoryCommandStores.Synchronized::new);
+                             InMemoryCommandStores.Synchronized::new,
+                             localConfig);
         awaitUninterruptibly(node.unsafeStart());
         return node;
     }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a7eff1cd..efeadfec 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -35,6 +35,7 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.config.LocalConfig;
 import accord.impl.MessageListener;
 import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
@@ -55,6 +56,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
 import accord.messages.LocalMessage;
 import accord.messages.MessageType;
 import accord.messages.Message;
@@ -229,11 +231,12 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(id, 
randomSupplier.get());
                 LongSupplier nowSupplier = nowSupplierSupplier.get();
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, executor, randomSupplier, topology, 
lookup::get, topologyUpdates);
+                LocalConfig localConfig = new MutableLocalConfig();
                 Node node = new Node(id, messageSink, LocalMessage::process, 
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, 
nowSupplier),
                                      () -> new ListStore(id), new 
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
                                      executor.agent(),
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
-                                     SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending));
+                                     SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending), localConfig);
                 lookup.put(id, node);
                 CoordinateDurabilityScheduling durability = new 
CoordinateDurabilityScheduling(node);
                 // TODO (desired): randomise
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index fdd8a91b..4b3d198d 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import accord.NetworkFilter;
 import accord.api.MessageSink;
+import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
 import accord.impl.SimpleProgressLog;
@@ -45,6 +46,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
 import accord.messages.Callback;
 import accord.messages.LocalMessage;
 import accord.messages.Reply;
@@ -119,6 +121,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
         MockStore store = new MockStore();
         MessageSink messageSink = messageSinkFactory.apply(id, this);
         MockConfigurationService configurationService = new 
MockConfigurationService(messageSink, onFetchTopology, topology);
+        LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(id,
                              messageSink,
                              LocalMessage::process,
@@ -132,7 +135,8 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
                              new ThreadPoolScheduler(),
                              SizeOfIntersectionSorter.SUPPLIER,
                              SimpleProgressLog::new,
-                             InMemoryCommandStores.SingleThread::new);
+                             InMemoryCommandStores.SingleThread::new,
+                             localConfig);
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(topology, true);
         return node;
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 15061ea5..ef573d11 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -32,6 +32,7 @@ import accord.api.ProgressLog;
 import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TestableConfigurationService;
+import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
@@ -43,6 +44,7 @@ import accord.impl.mock.MockConfigurationService;
 import accord.impl.mock.MockStore;
 import accord.local.Node.Id;
 import accord.local.SaveStatus.LocalExecution;
+import accord.config.MutableLocalConfig;
 import accord.primitives.FullKeyRoute;
 import accord.primitives.Keys;
 import accord.primitives.Participants;
@@ -109,10 +111,11 @@ public class ImmutableCommandTest
     private static Node createNode(Id id, CommandStoreSupport storeSupport)
     {
         MockCluster.Clock clock = new MockCluster.Clock(100);
+        LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(id, null, null, new 
MockConfigurationService(null, (epoch, service) -> { }, 
storeSupport.local.get()),
                         clock, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
                         () -> storeSupport.data, new 
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
-                        SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
+                        SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, localConfig);
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(storeSupport.local.get(), true);
         return node;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 1af7d4c2..0184765d 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -41,6 +41,8 @@ import java.util.function.Supplier;
 
 import accord.api.MessageSink;
 import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
@@ -314,12 +316,13 @@ public class Cluster implements Scheduler
             {
                 MessageSink messageSink = sinks.create(node, 
randomSupplier.get());
                 LongSupplier nowSupplier = nowSupplierSupplier.get();
+                LocalConfig localConfig = new MutableLocalConfig();
                 lookup.put(node, new Node(node, messageSink, 
LocalMessage::process, new SimpleConfigService(topology),
                                           nowSupplier, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
                                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                                           MaelstromAgent.INSTANCE,
                                           randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
-                                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new));
+                                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new, localConfig));
             }
 
             AsyncResult<?> startup = 
AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()),
 (a, b) -> null).beginAsResult();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 871123a3..40e73ff2 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -32,6 +32,8 @@ import java.util.function.Supplier;
 
 import accord.api.MessageSink;
 import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
 import accord.coordinate.Timeout;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.SimpleProgressLog;
@@ -173,11 +175,12 @@ public class Main
             MaelstromInit init = (MaelstromInit) packet.body;
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, 
init.self, out, err);
+            LocalConfig localConfig = new MutableLocalConfig();
             on = new Node(init.self, sink, LocalMessage::process, new 
SimpleConfigService(topology),
                           System::currentTimeMillis, 
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), 
scheduler, SizeOfIntersectionSorter.SUPPLIER,
-                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new);
+                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new, localConfig);
             awaitUninterruptibly(on.unsafeStart());
             err.println("Initialized node " + init.self);
             err.flush();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to