bereng commented on code in PR #3917:
URL: https://github.com/apache/cassandra/pull/3917#discussion_r2079141396
##########
test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java:
##########
@@ -142,19 +172,203 @@ public void testPstmtInvalidation() throws Throwable
createTable("CREATE TABLE %s (key int primary key, val int)");
+ long initialEvicted = numberOfEvictedStatements();
+
for (int cnt = 1; cnt < 10000; cnt++)
{
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING
TIMESTAMP " + cnt, clientState);
- if (numberOfEvictedStatements() > 0)
+ if (numberOfEvictedStatements() - initialEvicted > 0)
{
+ assertEquals("Number of statements in table and in cache don't
match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ // prepare more statements to trigger more evictions
+ for (int cnt2 = cnt + 1; cnt2 < cnt + 10; cnt2++)
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?)
USING TIMESTAMP " + cnt2, clientState);
+
+ // each new prepared statement should have caused an eviction
+ assertEquals("eviction count didn't increase by the expected
number", 10, numberOfEvictedStatements() - initialEvicted);
+ assertEquals("Number of statements in memory (expected) and
table (actual) don't match", numberOfStatementsInMemory(),
numberOfStatementsOnDisk());
+
return;
}
}
fail("Prepared statement eviction does not work");
}
+ @Test
+ @BMRules(rules= {
+ @BMRule(name = "CaptureWriteTimestamps",
+ targetClass = "SystemKeyspace",
+ targetMethod = "writePreparedStatement(String, MD5Digest,
String, long)",
+ targetLocation = "AT INVOKE executeInternal",
+ action =
"org.apache.cassandra.cql3.PstmtPersistenceTest.preparedStatementLoadTimestamps.put($key,
$timestamp);"
+ ),
+ @BMRule(name = "CaptureEvictTimestamps",
+ targetClass = "QueryProcessor",
+ targetMethod = "evictPreparedStatement(MD5Digest,
RemovalCause)",
+ action =
"org.apache.cassandra.cql3.PstmtPersistenceTest.preparedStatementRemoveTimestamps.put($key,
org.apache.cassandra.service.ClientState.getTimestamp());"
+ )
+ })
+ public void testAsyncPstmtInvalidation() throws Throwable
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ // prepare statements concurrently in a thread pool to exercise bug
encountered in CASSANDRA-19703 where
+ // delete from table occurs before the insert due to early eviction.
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ long initialEvicted = numberOfEvictedStatements();
+ try
+ {
+ int statementsToPrepare = 10000;
+ List<CompletableFuture<MD5Digest>> prepareFutures = new
ArrayList<>(statementsToPrepare);
+ for (int cnt = 1; cnt <= statementsToPrepare; cnt++)
+ {
+ final int localCnt = cnt;
+ prepareFutures.add(CompletableFuture.supplyAsync(() ->
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " +
localCnt, clientState), executor));
+ }
+
+ // Await completion
+
CompletableFuture.allOf(prepareFutures.toArray(futureArray)).get(10,
TimeUnit.SECONDS);
+
+ long evictedStatements = numberOfEvictedStatements() -
initialEvicted;
+ assertNotEquals("Should have evicted some prepared statements", 0,
evictedStatements);
+
+ // Recorded prepared statement removals should match metrics
+ assertEquals("Actual evicted statements does not match metrics",
evictedStatements, preparedStatementRemoveTimestamps.size());
+
+ // For each prepared statement evicted, assert the time it was
deleted is greater than the timestamp
+ // used for when it was loaded.
+ for (Map.Entry<MD5Digest, Long> evictedStatementEntry :
preparedStatementRemoveTimestamps.entrySet())
+ {
+ MD5Digest key = evictedStatementEntry.getKey();
+ long deletionTimestamp = evictedStatementEntry.getValue();
+ long insertionTimestamp =
preparedStatementLoadTimestamps.get(key);
+
+ assertTrue(String.format("Expected deletion timestamp for
prepared statement (%d) to be greater than insertion timestamp (%d)",
+ deletionTimestamp,
insertionTimestamp),
+ deletionTimestamp > insertionTimestamp);
+ }
+
+ // ensure the number of statements on disk match the number in
memory, if number of statements on disk eclipses in memory, there was a leak.
+ assertEquals("Number of statements in memory (expected) and table
(actual) don't match", numberOfStatementsInMemory(),
numberOfStatementsOnDisk());
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+
+ /**
+ * Invoked whenever paging happens in testPreloadPreparedStatements,
increments PAGE_INVOCATIONS when we detect
+ * paging happening in the path of
QueryProcessor.preloadPreparedStatements with the expected page size.
+ */
+ @SuppressWarnings("unused")
+ private static void nextPageReadQuery(ReadQuery query, int pageSize)
+ {
+ TableMetadata metadata = query.metadata();
+ if (metadata.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) &&
+ metadata.name.equals(SystemKeyspace.PREPARED_STATEMENTS) &&
+ pageSize == PRELOAD_PAGE_SIZE)
+ {
+ for (StackTraceElement stackTraceElement :
Thread.currentThread().getStackTrace())
+ {
+ if
(stackTraceElement.getClassName().equals(QueryProcessor.class.getName()) &&
stackTraceElement.getMethodName().equals("preloadPreparedStatements"))
+ {
+ pageInvocations.incrementAndGet();
+ return;
+ }
+ }
+ }
+ }
+
+ @Test
+ @BMRule(name = "CapturePageInvocations",
+ targetClass = "PartitionRangeQueryPager",
+ targetMethod = "nextPageReadQuery(int)",
+ action =
"org.apache.cassandra.cql3.PstmtPersistenceTest.nextPageReadQuery($this.query,
$pageSize)")
+ public void testPreloadPreparedStatements() throws Throwable
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ // Prepare more statements than the paging size to ensure paging works
properly.
+ int statementsToPrepare = 750;
+
+ for (int cnt = 1; cnt <= statementsToPrepare; cnt++)
+ {
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING
TIMESTAMP " + cnt, clientState);
+ }
+
+ // Capture how many statements are in memory before clearing cache.
+ long statementsInMemory = numberOfStatementsInMemory();
+ long statementsOnDisk = numberOfStatementsOnDisk();
+ assertEquals(statementsOnDisk, statementsInMemory);
+
+ // Drop prepared statements from cache only and ensure the cache
empties out.
+ QueryProcessor.clearPreparedStatements(true);
+ assertEquals(0, numberOfStatementsInMemory());
+
+ // Load prepared statements and ensure the cache size matches max
+ QueryProcessor.instance.preloadPreparedStatements(PRELOAD_PAGE_SIZE);
+
+ long statementsInMemoryAfterLoading = numberOfStatementsInMemory();
+ // Ensure size of cache matches statements that were on disk before
preload
+ assertEquals("Statements prepared - evicted (expected) does not match
statements in memory (actual)",
+ statementsOnDisk, statementsInMemoryAfterLoading);
+
+ // Number of statements on disk shold match memory
+ assertEquals(statementsInMemoryAfterLoading,
numberOfStatementsOnDisk());
+
+ // Ensure only executed the expected amount of pages.
+ int expectedPageInvocations = (int)
Math.ceil(statementsInMemoryAfterLoading / (double) PRELOAD_PAGE_SIZE);
+ assertEquals(expectedPageInvocations, pageInvocations.get());
+ }
+
+ @Test
+ public void testPreloadPreparedStatementsUntilCacheFull()
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ // Fill up and clear the prepared statement cache several times to
load up the system.prepared_statements table.
+ // This simulates a 'leak' of prepared statements akin to
CASSANDRA-19703 as the system.prepared_statements
+ // table is able to grow to a larger size than the in memory prepared
statement cache. In such a case we
+ // should detect a possible leak and defer paging indefinitely by
returning early in preloadPreparedStatements.
+ int fillIterations = 5;
+ int cnt = 1;
+ int statementsLoadedBeforeFull = -1;
+ for (int fillIteration = 0; fillIteration < fillIterations;
fillIteration++)
+ {
+ long initialEvicted =
QueryProcessor.metrics.preparedStatementsEvicted.getCount();
+ // load prepared statements until we detect any eviction, on
eviction we assume cache was filled.
+ while (QueryProcessor.metrics.preparedStatementsEvicted.getCount()
== initialEvicted)
+ {
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?)
USING TIMESTAMP " + cnt, clientState);
+ cnt++;
+ }
+
+ if (statementsLoadedBeforeFull == -1) {
Review Comment:
braces formatting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]