This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18519
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 2a34e823cd2b8d53b15d9fa7a5aef9eb3ebc248b
Author: David Capwell <dcapw...@gmail.com>
AuthorDate: Wed May 10 16:24:20 2023 -0700

    checkpoint
---
 .../src/main/java/accord/local/CommandStores.java  | 12 +++++
 .../src/main/java/accord/primitives/Timestamp.java | 10 ++++
 .../src/main/java/accord/primitives/TxnId.java     |  7 +++
 .../main/java/accord/utils/async/Observable.java   | 63 ++++++++++++++++++++++
 4 files changed, 92 insertions(+)

diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index b8203a1f..1a37c910 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -554,6 +554,18 @@ public abstract class CommandStores
         return snapshot.byId.get(id);
     }
 
+    public int[] ids()
+    {
+        Snapshot snapshot = current;
+        Int2ObjectHashMap<CommandStore>.KeySet set = snapshot.byId.keySet();
+        int[] ids = new int[set.size()];
+        int idx = 0;
+        for (int a : set)
+            ids[idx++] = a;
+        Arrays.sort(ids);
+        return ids;
+    }
+
     public int count()
     {
         return current.shards.length;
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 13bc2a55..1a5a44e4 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -345,4 +345,14 @@ public class Timestamp implements Comparable<Timestamp>
     {
         return "[" + epoch() + ',' + hlc() + ',' + flags() + ',' + node + ']';
     }
+
+    public static Timestamp fromString(String string)
+    {
+        String[] split = string.replaceFirst("\\[", "").replaceFirst("\\]", 
"").split(",");
+        assert split.length == 4;
+        return Timestamp.fromValues(Long.parseLong(split[0]),
+                                    Long.parseLong(split[1]),
+                                    Integer.parseInt(split[2]),
+                                    new Id(Integer.parseInt(split[3])));
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/TxnId.java 
b/accord-core/src/main/java/accord/primitives/TxnId.java
index b46602db..d2783f16 100644
--- a/accord-core/src/main/java/accord/primitives/TxnId.java
+++ b/accord-core/src/main/java/accord/primitives/TxnId.java
@@ -35,6 +35,13 @@ public class TxnId extends Timestamp
         return new TxnId(msb, lsb, node);
     }
 
+    public static TxnId fromTimestamp(Timestamp t)
+    {
+        if (t instanceof TxnId)
+            return (TxnId) t;
+        return new TxnId(t.epoch(), t.hlc(), t.flags(), t.node);
+    }
+
     public static TxnId fromValues(long epoch, long hlc, Id node)
     {
         return new TxnId(epoch, hlc, 0, node);
diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java 
b/accord-core/src/main/java/accord/utils/async/Observable.java
new file mode 100644
index 00000000..ac005e24
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.function.Function;
+
+public interface Observable<T>
+{
+    void onNext(T value);
+    default void onError(Throwable t) {}
+    default void onCompleted() {}
+
+    default <R> Observable<R> map(Function<? super R, ? extends T> mapper)
+    {
+        return new Map<>(this, mapper);
+    }
+
+    class Map<A, B> implements Observable<A>
+    {
+        private final Observable<B> next;
+        private final Function<? super A, ? extends B> mapper;
+
+        public Map(Observable<B> next, Function<? super A, ? extends B> mapper)
+        {
+            this.next = next;
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void onNext(A value)
+        {
+            next.onNext(mapper.apply(value));
+        }
+
+        @Override
+        public void onError(Throwable t)
+        {
+            next.onError(t);
+        }
+
+        @Override
+        public void onCompleted()
+        {
+            next.onCompleted();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to