This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9ed9c7cacd Iterate entries in reverse order, as we can not rely on
switch atomicity
9ed9c7cacd is described below
commit 9ed9c7cacd3faac8af83b9f45d18f6c3fdc6d58c
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Aug 7 13:46:06 2025 +0200
Iterate entries in reverse order, as we can not rely on switch atomicity
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20837
---
.../service/accord/AccordJournalTable.java | 200 ++++++++++-----------
.../cassandra/tools/StandaloneJournalUtil.java | 52 ++++++
.../cassandra/utils/JVMStabilityInspector.java | 3 -
3 files changed, 147 insertions(+), 108 deletions(-)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 0e8cff3bb5..3c97b02dcd 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -36,7 +36,6 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.UncheckedInterruptedException;
-import org.agrona.collections.LongHashSet;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Operator;
@@ -61,7 +60,6 @@ import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Bounds;
@@ -73,7 +71,6 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.journal.EntrySerializer.EntryHolder;
import org.apache.cassandra.journal.Journal;
import org.apache.cassandra.journal.KeySupport;
import org.apache.cassandra.journal.RecordConsumer;
@@ -83,10 +80,12 @@ import
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns;
import org.apache.cassandra.service.accord.api.TokenKey;
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
import org.apache.cassandra.service.accord.serializers.Version;
+import org.apache.cassandra.utils.Closeable;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import static
org.apache.cassandra.io.sstable.SSTableReadsListener.NOOP_LISTENER;
import static
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey;
@@ -221,63 +220,6 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
}
}
- // TODO (expected): this can be removed entirely when we "flush" segments
directly to sstables (but we perhaps need to be careful about the active
segment)
- private class TableRecordConsumer implements RecordConsumer<K>
- {
- final LongHashSet visited;
- final RecordConsumer<K> delegate;
-
- TableRecordConsumer(LongHashSet visited, RecordConsumer<K> delegate)
- {
- this.visited = visited;
- this.delegate = delegate;
- }
-
- boolean visited(long segment)
- {
- return visited != null && visited.contains(segment);
- }
-
- @Override
- public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
- {
- if (!visited(segment))
- delegate.accept(segment, position, key, buffer, userVersion);
- }
- }
-
- private class JournalAndTableRecordConsumer implements RecordConsumer<K>
- {
- private final K key;
- private final RecordConsumer<K> delegate;
- private LongHashSet visited;
-
- void visit(long segment)
- {
- if (visited == null)
- visited = new LongHashSet();
- visited.add(segment);
- }
-
- JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader)
- {
- this.key = key;
- this.delegate = reader;
- }
-
- void readTable()
- {
- readAllFromTable(key, new TableRecordConsumer(visited, delegate));
- }
-
- @Override
- public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
- {
- visit(segment);
- delegate.accept(segment, position, key, buffer, userVersion);
- }
- }
-
/**
* When using {@link PartitionRangeReadCommand} we need to work with
{@link RowFilter} which works with columns.
* But the index doesn't care about table based queries and needs to be
queried using the fields in the index, to
@@ -405,71 +347,119 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
*/
public void readAll(K key, Reader reader)
{
- readAll(key, new RecordConsumerAdapter(reader));
+ readAll(key, new RecordConsumerAdapter<>(reader));
}
public void readAll(K key, RecordConsumer<K> reader)
{
- JournalAndTableRecordConsumer consumer = new
JournalAndTableRecordConsumer(key, reader);
- journal.readAll(key, consumer);
- consumer.readTable();
+ try (TableKeyIterator table = readAllFromTable(key))
+ {
+ boolean hasTableData = table.advance();
+ long minSegment = hasTableData ? table.segment : Long.MIN_VALUE;
+ // First, read all journal entries newer than anything flushed
into sstables
+ journal.readAll(key, (segment, position, key1, buffer,
userVersion) -> {
+ if (segment > minSegment)
+ reader.accept(segment, position, key1, buffer,
userVersion);
+ });
+
+ // Then, read SSTables
+ while (hasTableData)
+ {
+ reader.accept(table.segment, table.offset, key, table.value,
table.userVersion);
+ hasTableData = table.advance();
+ }
+ }
}
-
- private void readAllFromTable(K key, TableRecordConsumer onEntry)
+
+ // TODO (expected): why are recordColumn and versionColumn instance
fields, so that this cannot be a static class?
+ class TableKeyIterator implements Closeable, RecordConsumer<K>
{
- DecoratedKey pk = JournalColumns.decorate(key);
- try (RefViewFragment view =
cfs.selectAndReference(View.select(SSTableSet.LIVE, pk)))
+ final K key;
+ final List<UnfilteredRowIterator> unmerged;
+ final UnfilteredRowIterator merged;
+ final OpOrder.Group readOrder;
+
+ long segment;
+ int offset;
+ ByteBuffer value;
+ int userVersion;
+
+ TableKeyIterator(K key, List<UnfilteredRowIterator> unmerged,
UnfilteredRowIterator merged, OpOrder.Group readOrder)
{
- if (view.sstables.isEmpty())
- return;
+ this.key = key;
+ this.unmerged = unmerged;
+ this.merged = merged;
+ this.readOrder = readOrder;
+ }
- List<UnfilteredRowIterator> iters = new ArrayList<>(Math.min(4,
view.sstables.size()));
- try
- {
- for (SSTableReader sstable : view.sstables)
- {
- if (!sstable.mayContainAssumingKeyIsInRange(pk))
- continue;
+ @Override
+ public void accept(long segment, int offset, K key, ByteBuffer buffer,
int userVersion)
+ {
+ this.segment = segment;
+ this.offset = offset;
+ this.value = buffer;
+ this.userVersion = userVersion;
+ }
- UnfilteredRowIterator iter =
StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL,
ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER);
- if (iter.getClass() !=
EmptyIterators.EmptyUnfilteredRowIterator.class)
- iters.add(iter);
- }
+ boolean advance()
+ {
+ if (merged == null || !merged.hasNext())
+ return false;
- if (!iters.isEmpty())
- {
- EntryHolder<K> into = new EntryHolder<>();
- try (UnfilteredRowIterator iter =
UnfilteredRowIterators.merge(iters))
- {
- while (iter.hasNext()) readRow(key, iter.next(), into,
onEntry);
- }
- }
+ try
+ {
+ Row row = (Row) merged.next();
+ segment = LongType.instance.compose(ByteBuffer.wrap((byte[])
row.clustering().get(0)));
+ offset = Int32Type.instance.compose(ByteBuffer.wrap((byte[])
row.clustering().get(1)));
+ value = row.getCell(recordColumn).buffer();
+ userVersion =
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
+ return true;
}
catch (Throwable t)
{
- String message = "Failed to read from " + iters;
- for (UnfilteredRowIterator iter : iters)
- {
- try { iter.close(); }
- catch (Throwable t2) { t.addSuppressed(t2); }
- }
- throw new FSReadError(message, t);
+ throw new FSReadError("Failed to read from " + unmerged, t);
}
}
+
+ @Override
+ public void close()
+ {
+ readOrder.close();
+ if (merged != null)
+ merged.close();
+ }
}
- private void readRow(K key, Unfiltered unfiltered, EntryHolder<K> into,
RecordConsumer<K> onEntry)
+ private TableKeyIterator readAllFromTable(K key)
{
- Invariants.require(unfiltered.isRow());
- Row row = (Row) unfiltered;
+ DecoratedKey pk = JournalColumns.decorate(key);
+ OpOrder.Group readOrder = cfs.readOrdering.start();
+ List<UnfilteredRowIterator> iters = new ArrayList<>(3);
+ try
+ {
+ ColumnFamilyStore.ViewFragment view =
cfs.select(View.select(SSTableSet.LIVE, pk));
+ for (SSTableReader sstable : view.sstables)
+ {
+ if (!sstable.mayContainAssumingKeyIsInRange(pk))
+ continue;
- long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[])
row.clustering().get(0)));
- int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[])
row.clustering().get(1)));
- into.key = key;
- into.value = row.getCell(recordColumn).buffer();
- into.userVersion =
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
+ UnfilteredRowIterator iter =
StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL,
ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER);
+ if (iter.getClass() !=
EmptyIterators.EmptyUnfilteredRowIterator.class)
+ iters.add(iter);
+ }
- onEntry.accept(descriptor, position, into.key, into.value,
into.userVersion);
+ return new TableKeyIterator(key, iters, iters.isEmpty() ? null :
UnfilteredRowIterators.merge(iters), readOrder);
+ }
+ catch (Throwable t)
+ {
+ readOrder.close();
+ for (UnfilteredRowIterator iter : iters)
+ {
+ try { iter.close(); }
+ catch (Throwable t2) { t.addSuppressed(t2); }
+ }
+ throw t;
+ }
}
@SuppressWarnings("resource") // Auto-closeable iterator will release
related resources
diff --git a/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
b/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
index 83db8e23ce..93132c5467 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import accord.utils.Invariants;
import org.apache.cassandra.config.AccordSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DurationSpec;
@@ -351,4 +352,55 @@ public class StandaloneJournalUtil implements Runnable
}
}
}
+
+ @Command(name = "load", description = "Load item from journal")
+ public static class Load implements Runnable
+ {
+ @Option(names = {"-s", "--sstables"}, description = "Path to sstables")
+ public String sstables;
+
+ @Option(names = {"-j", "--journal-segments"}, description = "Path to
journal segments")
+ public String journalSegments;
+
+ @Option(names = {"-k", "--kind"}, description = "Kind to filter by")
+ public String kind;
+
+ @Option(names = {"-c", "--command-store-id"}, description = "Command
Store id")
+ public String commandStoreId;
+
+ public void run()
+ {
+ if (sstables == null && journalSegments == null)
+ throw new IllegalArgumentException("Either --sstables or
--journal-segments must be provided");
+
+ if (journalSegments == null)
+ {
+ try
+ {
+ journalSegments =
Files.createTempDirectory("dump_journal").getFileName().toString();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ setAccordJournalDirectory(journalSegments);
+ Keyspace.setInitialized();
+ AccordJournal journal = new AccordJournal(new
AccordSpec.JournalSpec().setFlushPeriod(new
DurationSpec.IntMillisecondsBound("1500ms")), new
File(journalSegments).parent(),
Keyspace.open(SchemaConstants.ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL));
+
+ Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord");
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore("journal");
+ if (sstables != null)
+ cfs.importNewSSTables(Collections.singleton(sstables), false,
false, false, false, false, false, true);
+
+ journal.start(null);
+ if
(kind.toString().equals(JournalKey.Type.REDUNDANT_BEFORE.toString()))
+ {
+ Invariants.require(commandStoreId != null);
+ int commandStoreId = Integer.parseInt(this.commandStoreId);
+
output.out.println(journal.loadRedundantBefore(commandStoreId));
+ }
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 0ca992b642..2c840d4a7d 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-import ch.qos.logback.classic.LoggerContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
@@ -274,8 +273,6 @@ public final class JVMStabilityInspector
if (doExit && killing.compareAndSet(false, true))
{
- if (LoggerFactory.getILoggerFactory() instanceof LoggerContext)
- ((LoggerContext) LoggerFactory.getILoggerFactory()).stop();
StorageService.instance.removeShutdownHook();
System.exit(100);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]