This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f31d72e8758ee1018435612775c73f7addbb6ec4 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Feb 12 14:23:25 2024 +0100 Flush snapshot table on every write and introduce reversed long local partitioner. Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19391 --- .../org/apache/cassandra/db/SystemKeyspace.java | 22 +- .../dht/ReversedLongLocalPartitioner.java | 224 +++++++++++++++++++++ 2 files changed, 241 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 9e6b53ee3c..ff54d226b0 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -62,7 +62,6 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.db.marshal.ReversedType; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.db.marshal.UTF8Type; @@ -73,6 +72,7 @@ import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.ReversedLongLocalPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.EndpointState; @@ -505,7 +505,7 @@ public final class SystemKeyspace "epoch bigint PRIMARY KEY," + "period bigint," + "snapshot blob)") - .partitioner(new LocalPartitioner(ReversedType.getInstance(LongType.instance))) + .partitioner(ReversedLongLocalPartitioner.instance) .build(); @Deprecated(since = "4.0") @@ -1990,6 +1990,7 @@ public final class SystemKeyspace logger.info("Storing snapshot of cluster metadata at epoch {} (period {})", epoch, period); String query = String.format("INSERT INTO %s.%s (epoch, period, snapshot) VALUES (?, ?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME); executeInternal(query, epoch.getEpoch(), period, snapshot); + forceBlockingFlush(SNAPSHOT_TABLE_NAME); } public static ByteBuffer getSnapshot(Epoch epoch) @@ -2003,12 +2004,23 @@ public final class SystemKeyspace } /** - * WARNING: we use token(epoch) >= search in the query below - this is due the fact that we use LocalPartitioner with a reversed LongToken - * and this is not quite supported (yet), so the query is actually `token(epoch) <= search` which is what we want here + * We use ReversedLongLocalPartitioner here, which calculates token as Long.MAX_VALUE - key + * + * table is something like (assuming Long.MAX_VALUE is 1000 for easier calculations...): + * epoch | token(epoch) + * -------------------- + * 100 | 900 + * 90 | 910 + * 80 | 920 + * 70 | 970 + * ... + * + * so to find the first snapshot before epoch 85, we query the table with token(epoch) >= token(85)=915. Which gives us + * epoch 80, 70... and the first row is the first snapshot before `search` */ public static ByteBuffer findSnapshotBefore(Epoch search) { - String query = String.format("SELECT snapshot FROM %s.%s WHERE token(epoch) >= ? LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME); + String query = String.format("SELECT snapshot FROM %s.%s WHERE token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME); UntypedResultSet res = executeInternal(query, search.getEpoch()); if (res != null && !res.isEmpty()) diff --git a/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java new file mode 100644 index 0000000000..c423ba7e13 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import com.google.common.primitives.Longs; + +import org.apache.cassandra.db.CachedHashDecoratedKey; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; +import org.apache.cassandra.utils.memory.HeapCloner; + +/** + * Special ordered partitioner for the TCM snapshots table, sorting in reverse. Keys are longs (epoch), and tokens + * become Long.MAX_VALUE - key. Keys are required to be >= 0. + */ +public class ReversedLongLocalPartitioner implements IPartitioner +{ + public static ReversedLongLocalPartitioner instance = new ReversedLongLocalPartitioner(); + private static final ReversedLongLocalToken MIN_TOKEN = new ReversedLongLocalToken(Long.MIN_VALUE); + private static final long HEAP_SIZE = ObjectSizes.measure(MIN_TOKEN); + + private ReversedLongLocalPartitioner() {} + + public DecoratedKey decorateKey(ByteBuffer key) + { + return new CachedHashDecoratedKey(getToken(key), key); // CachedHashDecoratedKey is used for bloom filter hash calculation + } + + public Token midpoint(Token left, Token right) + { + throw new UnsupportedOperationException(); + } + + public Token split(Token left, Token right, double ratioToLeft) + { + throw new UnsupportedOperationException(); + } + + public Token getMinimumToken() + { + return MIN_TOKEN; + } + + public Token getToken(ByteBuffer key) + { + if (!key.hasRemaining()) + return MIN_TOKEN; + long longKey = ByteBufferUtil.toLong(HeapCloner.instance.clone(key)); + assert longKey >= 0 : "ReversedLocalLongToken only supports non-negative keys, not " + longKey; + return new ReversedLongLocalToken(Long.MAX_VALUE - longKey); + } + + public Token getRandomToken() + { + throw new UnsupportedOperationException(); + } + + public Token getRandomToken(Random random) + { + throw new UnsupportedOperationException(); + } + + public Token.TokenFactory getTokenFactory() + { + return tokenFactory; + } + + private final Token.TokenFactory tokenFactory = new Token.TokenFactory() + { + public Token fromComparableBytes(ByteSource.Peekable comparableBytes, ByteComparable.Version version) + { + long tokenData = ByteSourceInverse.getSignedLong(comparableBytes); + return new ReversedLongLocalToken(tokenData); + } + + public ByteBuffer toByteArray(Token token) + { + ReversedLongLocalToken longToken = (ReversedLongLocalToken) token; + return ByteBufferUtil.bytes(longToken.token); + } + + public Token fromByteArray(ByteBuffer bytes) + { + return new ReversedLongLocalToken(ByteBufferUtil.toLong(bytes)); + } + + public String toString(Token token) + { + return token.toString(); + } + + public void validate(String token) + { + LongType.instance.validate(LongType.instance.fromString(token)); + } + + public Token fromString(String string) + { + return new ReversedLongLocalToken(Long.parseLong(string)); + } + }; + + public boolean preservesOrder() + { + return true; + } + + public Map<Token, Float> describeOwnership(List<Token> sortedTokens) + { + return Collections.singletonMap(getMinimumToken(), 1.0F); + } + + public AbstractType<?> getTokenValidator() + { + return LongType.instance; + } + + public AbstractType<?> partitionOrdering() + { + return LongType.instance; + } + + private static class ReversedLongLocalToken extends Token + { + private final long token; + + public ReversedLongLocalToken(long token) + { + this.token = token; + } + + @Override + public IPartitioner getPartitioner() + { + return ReversedLongLocalPartitioner.instance; + } + + @Override + public long getHeapSize() + { + return HEAP_SIZE; + } + + @Override + public Object getTokenValue() + { + return token; + } + + @Override + public ByteSource asComparableBytes(ByteComparable.Version version) + { + return ByteSource.of(token); + } + + @Override + public double size(Token next) + { + throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.", + getClass().getSimpleName())); + } + + @Override + public Token nextValidToken() + { + throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.", + getClass().getSimpleName())); + } + + @Override + public int compareTo(Token o) + { + return Long.compare(token, ((ReversedLongLocalToken)o).token); + } + + @Override + public String toString() + { + return String.valueOf(token); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof ReversedLongLocalToken)) return false; + ReversedLongLocalToken that = (ReversedLongLocalToken) o; + return token == that.token; + } + + @Override + public int hashCode() + { + return Longs.hashCode(token); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org