ifesdjeen commented on code in PR #3743:
URL: https://github.com/apache/cassandra/pull/3743#discussion_r1916715105
##########
test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java:
##########
Review Comment:
Since the index is called participant index, should we also call this test
participant index test, or maybe should we rename test? Consistency will help
discoverability.
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
@@ -108,42 +111,57 @@ public enum RegisterStatus
private volatile boolean initBuildStarted = false;
private volatile RegisterStatus registerStatus = RegisterStatus.PENDING;
- public RouteIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ public ParticipantsJournalIndex(ColumnFamilyStore baseCfs, IndexMetadata
indexMetadata)
{
- if
(!SchemaConstants.ACCORD_KEYSPACE_NAME.equals(baseCfs.getKeyspaceName()))
- throw new IllegalArgumentException("Route index is only allowed
for accord commands table; given " + baseCfs().metadata());
- if (!AccordKeyspace.COMMANDS.equals(baseCfs.name))
- throw new IllegalArgumentException("Route index is only allowed
for accord commands table; given " + baseCfs().metadata());
-
- TableMetadata tableMetadata = baseCfs.metadata();
- Pair<ColumnMetadata, IndexTarget.Type> target =
TargetParser.parse(tableMetadata, indexMetadata);
- if
(!AccordKeyspace.CommandsColumns.participants.name.equals(target.left.name))
- throw new IllegalArgumentException("Attempted to index the wrong
column; needed " + AccordKeyspace.CommandsColumns.participants.name + " but
given " + target.left.name);
-
- if (target.right != IndexTarget.Type.VALUES)
- throw new IllegalArgumentException("Attempted to index " +
AccordKeyspace.CommandsColumns.participants.name + " with index type " +
target.right + "; only " + IndexTarget.Type.VALUES + " is supported");
+ validateTargets(baseCfs, indexMetadata);
this.baseCfs = baseCfs;
+ // type is only IndexTarget.Type.VALUES
+ this.record = AccordKeyspace.JournalColumns.record;
Review Comment:
nit: maybe use them straight from journal columns? why have them as separate
fields here
##########
src/java/org/apache/cassandra/service/accord/AccordCache.java:
##########
@@ -1190,7 +1190,7 @@ public Runnable save(AccordCommandStore commandStore,
TxnId txnId, @Nullable Com
return null;
}
- return commandStore.appendToKeyspace(txnId, value);
+ return null;
Review Comment:
Do I understand it correctly though that since the command is already in the
log, we basically already cached it?
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
@@ -108,42 +111,57 @@ public enum RegisterStatus
private volatile boolean initBuildStarted = false;
private volatile RegisterStatus registerStatus = RegisterStatus.PENDING;
- public RouteIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ public ParticipantsJournalIndex(ColumnFamilyStore baseCfs, IndexMetadata
indexMetadata)
{
- if
(!SchemaConstants.ACCORD_KEYSPACE_NAME.equals(baseCfs.getKeyspaceName()))
- throw new IllegalArgumentException("Route index is only allowed
for accord commands table; given " + baseCfs().metadata());
- if (!AccordKeyspace.COMMANDS.equals(baseCfs.name))
- throw new IllegalArgumentException("Route index is only allowed
for accord commands table; given " + baseCfs().metadata());
-
- TableMetadata tableMetadata = baseCfs.metadata();
- Pair<ColumnMetadata, IndexTarget.Type> target =
TargetParser.parse(tableMetadata, indexMetadata);
- if
(!AccordKeyspace.CommandsColumns.participants.name.equals(target.left.name))
- throw new IllegalArgumentException("Attempted to index the wrong
column; needed " + AccordKeyspace.CommandsColumns.participants.name + " but
given " + target.left.name);
-
- if (target.right != IndexTarget.Type.VALUES)
- throw new IllegalArgumentException("Attempted to index " +
AccordKeyspace.CommandsColumns.participants.name + " with index type " +
target.right + "; only " + IndexTarget.Type.VALUES + " is supported");
+ validateTargets(baseCfs, indexMetadata);
this.baseCfs = baseCfs;
+ // type is only IndexTarget.Type.VALUES
+ this.record = AccordKeyspace.JournalColumns.record;
+ this.user_version = AccordKeyspace.JournalColumns.user_version;
this.indexMetadata = indexMetadata;
+
this.memtableIndexManager = new RouteMemtableIndexManager(this);
this.sstableManager = new RouteSSTableManager();
this.indexMetrics = new IndexMetrics(this);
- this.column = target.left;
Tracker tracker = baseCfs.getTracker();
tracker.subscribe(this);
}
- public ColumnMetadata column()
+ public static boolean allowed(JournalKey id)
+ {
+ return id.type == JournalKey.Type.COMMAND_DIFF && allowed(id.id);
+ }
+
+ public static boolean allowed(TxnId id)
{
- return column;
+ return id.domain().isRange();
+ }
+
+ private static void validateTargets(ColumnFamilyStore baseCfs,
IndexMetadata indexMetadata)
+ {
+ // this contains 2 columns....
+ if
(!SchemaConstants.ACCORD_KEYSPACE_NAME.equals(baseCfs.getKeyspaceName()))
+ throw new IllegalArgumentException("Route index is only allowed
for accord journal table; given " + baseCfs.metadata());
+ if (!AccordKeyspace.JOURNAL.equals(baseCfs.name))
+ throw new IllegalArgumentException("Route index is only allowed
for accord journal table; given " + baseCfs.metadata());
+ Set<String> columns =
Splitter.on(',').trimResults().omitEmptyStrings().splitToStream(indexMetadata.options.get("target")).collect(Collectors.toSet());
+ Set<String> expected = Set.of("record", "user_version");
+ if (!expected.equals(columns))
+ throw new IllegalArgumentException("Route index is only allowed
for accord journal table, and on the record/user_value columns; given " +
baseCfs.metadata() + " and columns " + columns);
}
public IndexMetrics indexMetrics()
{
return indexMetrics;
}
+ public RegisterStatus registerStatus()
+ {
+ return registerStatus;
+ }
+
Review Comment:
Could not add a comment directly to L201, so placing here. It looks like
`starting` is an unused boolean; and I think we do not need
`forceBlockingFlush` anymore.
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
@@ -407,7 +416,7 @@ public Searcher searcherFor(ReadCommand command)
return null;
}
}
- else if (e.column() == AccordKeyspace.CommandsColumns.store_id &&
e.operator() == Operator.EQ)
+ else if (e.column() ==
AccordJournalTable.SyntheticColumn.store_id.metadata && e.operator() ==
Operator.EQ)
Review Comment:
Shouldw e maybe forbid anything except `EQ` for the store?
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
Review Comment:
In this class, in `#getInitializationTask`, there is `forceBlockingFlush`
task. Journal tables do not have memtables, so I would either remove this code,
or replace it with the logic that would assert that memtables are empty.
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
Review Comment:
(unresolved because of the pending comments)
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -906,8 +941,19 @@ private String maybeAddDiskSpaceContext(String message)
@VisibleForTesting
public void truncateForTesting()
{
- advanceSegment(null);
- segments.set(Segments.none());
+ ActiveSegment<?, ?> discarding = currentSegment;
+ if (discarding.index.size() > 0) // if there is no data in the
segement then ignore it
+ {
+ closeCurrentSegmentForTestingIfNonEmpty();
+ // wait for the ActiveSegment to get released, else can see weird
race conditions;
+ // this thread will see the static segmenet and will release it
(which will delete the file),
+ // and the sync thread will then try to release and will fail as
the file no longer exists...
+ while (discarding.selfRef().globalCount() > 0) {}
Review Comment:
I understand; I am just saying that this is OK as a dev solution, but we
need to actually solve this problem. Would you mind to convert this into `TODO
(now)` if you are not going to include the fix to this patch?
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
@@ -494,6 +503,50 @@ NavigableSet<ByteBuffer> search(int storeId,
};
}
+ @Override
+ public void handleNotification(INotification notification, Object sender)
+ {
+ // unfortunately, we can only check the type of notification via
instanceof :(
+ if (notification instanceof SSTableAddedNotification)
+ {
+ SSTableAddedNotification notice = (SSTableAddedNotification)
notification;
+ sstableManager.onSSTableChanged(Collections.emptySet(),
notice.added);
+ }
+ else if (notification instanceof SSTableListChangedNotification)
+ {
+ SSTableListChangedNotification notice =
(SSTableListChangedNotification) notification;
+ sstableManager.onSSTableChanged(notice.removed, notice.added);
+ }
+ else if (notification instanceof MemtableRenewedNotification)
Review Comment:
I _think_ we should never get any of these. Should we try throwing on
memtable renewed and discarded?
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
Review Comment:
What would generally happen if the index is non-queriable? We probably need
to throw, since if there's a chance we can return empty resultset, it can be a
severe correctness violation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]