virajjasani commented on code in PR #2211:
URL: https://github.com/apache/phoenix/pull/2211#discussion_r2193685255
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java:
##########
@@ -208,6 +209,37 @@ public void testStreamMetadataWhenTableIsDropped() throws
SQLException {
Assert.assertTrue(rs.next());
}
+ @Test
+ public void testCDCStreamTTL() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName, true);
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
+ String sql = "SELECT PARTITION_END_TIME FROM SYSTEM.CDC_STREAM WHERE
TABLE_NAME='" + tableName + "'";
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ Assert.assertEquals(3, count);
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ long t = System.currentTimeMillis() +
QueryServicesOptions.DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS;
+ t = (t / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(t);
+ rs = conn.createStatement().executeQuery(sql);
+ int newCount = 0;
+ while (rs.next()) {
+ // parent partition row with non-zero end time should have expired
+ if (rs.getLong(1) > 0) {
+ Assert.fail("Closed partition should have expired after TTL.");
+ }
+ newCount++;
+ }
+ Assert.assertEquals(2, newCount);
+ EnvironmentEdgeManager.reset();
Review Comment:
Let's wrap this in try-finally:
```
try {
ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
long t = System.currentTimeMillis() +
QueryServicesOptions.DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS;
t = (t / 1000) * 1000;
EnvironmentEdgeManager.injectEdge(injectEdge);
injectEdge.setValue(t);
rs = conn.createStatement().executeQuery(sql);
int newCount = 0;
while (rs.next()) {
// parent partition row with non-zero end time should have
expired
if (rs.getLong(1) > 0) {
Assert.fail("Closed partition should have expired after
TTL.");
}
newCount++;
}
Assert.assertEquals(2, newCount);
} finally {
EnvironmentEdgeManager.reset();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]