This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f31d72e8758ee1018435612775c73f7addbb6ec4
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Feb 12 14:23:25 2024 +0100

    Flush snapshot table on every write and introduce reversed long local 
partitioner.
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19391
---
 .../org/apache/cassandra/db/SystemKeyspace.java    |  22 +-
 .../dht/ReversedLongLocalPartitioner.java          | 224 +++++++++++++++++++++
 2 files changed, 241 insertions(+), 5 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9e6b53ee3c..ff54d226b0 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -62,7 +62,6 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -73,6 +72,7 @@ import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.ReversedLongLocalPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.EndpointState;
@@ -505,7 +505,7 @@ public final class SystemKeyspace
                                                         "epoch bigint PRIMARY 
KEY," +
                                                         "period bigint," +
                                                         "snapshot blob)")
-                                                  .partitioner(new 
LocalPartitioner(ReversedType.getInstance(LongType.instance)))
+                                                  
.partitioner(ReversedLongLocalPartitioner.instance)
                                                   .build();
 
     @Deprecated(since = "4.0")
@@ -1990,6 +1990,7 @@ public final class SystemKeyspace
         logger.info("Storing snapshot of cluster metadata at epoch {} (period 
{})", epoch, period);
         String query = String.format("INSERT INTO %s.%s (epoch, period, 
snapshot) VALUES (?, ?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SNAPSHOT_TABLE_NAME);
         executeInternal(query, epoch.getEpoch(), period, snapshot);
+        forceBlockingFlush(SNAPSHOT_TABLE_NAME);
     }
 
     public static ByteBuffer getSnapshot(Epoch epoch)
@@ -2003,12 +2004,23 @@ public final class SystemKeyspace
     }
 
     /**
-     * WARNING: we use token(epoch) >= search in the query below - this is due 
the fact that we use LocalPartitioner with a reversed LongToken
-     *          and this is not quite supported (yet), so the query is 
actually `token(epoch) <= search` which is what we want here
+     * We use ReversedLongLocalPartitioner here, which calculates token as 
Long.MAX_VALUE - key
+     *
+     * table is something like (assuming Long.MAX_VALUE is 1000 for easier 
calculations...):
+     * epoch | token(epoch)
+     * --------------------
+     *   100 | 900
+     *    90 | 910
+     *    80 | 920
+     *    70 | 970
+     *    ...
+     *
+     * so to find the first snapshot before epoch 85, we query the table with 
token(epoch) >= token(85)=915. Which gives us
+     * epoch 80, 70... and the first row is the first snapshot before `search`
      */
     public static ByteBuffer findSnapshotBefore(Epoch search)
     {
-        String query = String.format("SELECT snapshot FROM %s.%s WHERE 
token(epoch) >= ? LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SNAPSHOT_TABLE_NAME);
+        String query = String.format("SELECT snapshot FROM %s.%s WHERE 
token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SNAPSHOT_TABLE_NAME);
 
         UntypedResultSet res = executeInternal(query, search.getEpoch());
         if (res != null && !res.isEmpty())
diff --git 
a/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java 
b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java
new file mode 100644
index 0000000000..c423ba7e13
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.primitives.Longs;
+
+import org.apache.cassandra.db.CachedHashDecoratedKey;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
+import org.apache.cassandra.utils.memory.HeapCloner;
+
+/**
+ * Special ordered partitioner for the TCM snapshots table, sorting in 
reverse. Keys are longs (epoch), and tokens
+ * become Long.MAX_VALUE - key. Keys are required to be >= 0.
+ */
+public class ReversedLongLocalPartitioner implements IPartitioner
+{
+    public static ReversedLongLocalPartitioner instance = new 
ReversedLongLocalPartitioner();
+    private static final ReversedLongLocalToken MIN_TOKEN = new 
ReversedLongLocalToken(Long.MIN_VALUE);
+    private static final long HEAP_SIZE = ObjectSizes.measure(MIN_TOKEN);
+
+    private ReversedLongLocalPartitioner() {}
+
+    public DecoratedKey decorateKey(ByteBuffer key)
+    {
+        return new CachedHashDecoratedKey(getToken(key), key); // 
CachedHashDecoratedKey is used for bloom filter hash calculation
+    }
+
+    public Token midpoint(Token left, Token right)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Token split(Token left, Token right, double ratioToLeft)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Token getMinimumToken()
+    {
+        return MIN_TOKEN;
+    }
+
+    public Token getToken(ByteBuffer key)
+    {
+        if (!key.hasRemaining())
+            return MIN_TOKEN;
+        long longKey = ByteBufferUtil.toLong(HeapCloner.instance.clone(key));
+        assert longKey >= 0 : "ReversedLocalLongToken only supports 
non-negative keys, not " + longKey;
+        return new ReversedLongLocalToken(Long.MAX_VALUE - longKey);
+    }
+
+    public Token getRandomToken()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Token getRandomToken(Random random)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Token.TokenFactory getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+    private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
+    {
+        public Token fromComparableBytes(ByteSource.Peekable comparableBytes, 
ByteComparable.Version version)
+        {
+            long tokenData = ByteSourceInverse.getSignedLong(comparableBytes);
+            return new ReversedLongLocalToken(tokenData);
+        }
+
+        public ByteBuffer toByteArray(Token token)
+        {
+            ReversedLongLocalToken longToken = (ReversedLongLocalToken) token;
+            return ByteBufferUtil.bytes(longToken.token);
+        }
+
+        public Token fromByteArray(ByteBuffer bytes)
+        {
+            return new ReversedLongLocalToken(ByteBufferUtil.toLong(bytes));
+        }
+
+        public String toString(Token token)
+        {
+            return token.toString();
+        }
+
+        public void validate(String token)
+        {
+            LongType.instance.validate(LongType.instance.fromString(token));
+        }
+
+        public Token fromString(String string)
+        {
+            return new ReversedLongLocalToken(Long.parseLong(string));
+        }
+    };
+
+    public boolean preservesOrder()
+    {
+        return true;
+    }
+
+    public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
+    {
+        return Collections.singletonMap(getMinimumToken(), 1.0F);
+    }
+
+    public AbstractType<?> getTokenValidator()
+    {
+        return LongType.instance;
+    }
+
+    public AbstractType<?> partitionOrdering()
+    {
+        return LongType.instance;
+    }
+
+    private static class ReversedLongLocalToken extends Token
+    {
+        private final long token;
+
+        public ReversedLongLocalToken(long token)
+        {
+            this.token = token;
+        }
+
+        @Override
+        public IPartitioner getPartitioner()
+        {
+            return ReversedLongLocalPartitioner.instance;
+        }
+
+        @Override
+        public long getHeapSize()
+        {
+            return HEAP_SIZE;
+        }
+
+        @Override
+        public Object getTokenValue()
+        {
+            return token;
+        }
+
+        @Override
+        public ByteSource asComparableBytes(ByteComparable.Version version)
+        {
+            return ByteSource.of(token);
+        }
+
+        @Override
+        public double size(Token next)
+        {
+            throw new UnsupportedOperationException(String.format("Token type 
%s does not support token allocation.",
+                                                                  
getClass().getSimpleName()));
+        }
+
+        @Override
+        public Token nextValidToken()
+        {
+            throw new UnsupportedOperationException(String.format("Token type 
%s does not support token allocation.",
+                                                                  
getClass().getSimpleName()));
+        }
+
+        @Override
+        public int compareTo(Token o)
+        {
+            return Long.compare(token, ((ReversedLongLocalToken)o).token);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.valueOf(token);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof ReversedLongLocalToken)) return false;
+            ReversedLongLocalToken that = (ReversedLongLocalToken) o;
+            return token == that.token;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Longs.hashCode(token);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to