haridsv commented on code in PR #2419:
URL: https://github.com/apache/phoenix/pull/2419#discussion_r3137816864
##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -223,15 +225,15 @@ public List<PhoenixSyncTableCheckpointOutputRow>
getProcessedChunks(String table
ps.setString(paramIndex++, tenantId);
}
if (hasEndBoundary) {
- ps.setBytes(paramIndex++, mapperRegionEnd);
+ ps.setString(paramIndex++, bytesToHex(mapperRegionEnd));
}
if (hasStartBoundary) {
- ps.setBytes(paramIndex, mapperRegionStart);
+ ps.setString(paramIndex, bytesToHex(mapperRegionStart));
}
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
- byte[] rawStartKey = rs.getBytes("START_ROW_KEY");
- byte[] endRowKey = rs.getBytes("END_ROW_KEY");
+ byte[] rawStartKey = hexToBytes(rs.getString("START_ROW_KEY"));
+ byte[] endRowKey = hexToBytes(rs.getString("END_ROW_KEY"));
Review Comment:
I would recommend using base64 encoding which is a lot more efficient than
hex (always doubles the size).
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java:
##########
@@ -207,80 +208,36 @@ public void testSyncTableWithDeletedRowsOnTarget() throws
Exception {
validateMapperCounters(counters, 1, 3);
}
- @Test
- public void testSyncTableWithConditionalTTLExpiredRows() throws Exception {
- // With IS_STRICT_TTL=false
- String ddl = "CREATE TABLE IF NOT EXISTS %s (" + "ID INTEGER NOT NULL
PRIMARY KEY, "
- + "NAME VARCHAR(50), NAME_VALUE BIGINT, UPDATED_DATE TIMESTAMP, " +
"EXPIRED BOOLEAN"
- + ") REPLICATION_SCOPE=%d, UPDATE_CACHE_FREQUENCY=0, "
- + "TTL='EXPIRED = TRUE', IS_STRICT_TTL=false " + "SPLIT ON (5, 7, 9)";
- executeTableCreation(sourceConnection, String.format(ddl, uniqueTableName,
1));
- executeTableCreation(targetConnection, String.format(ddl, uniqueTableName,
0));
-
- // Insert 10 rows on source: rows 1-3 marked as expired, rows 4-10 as live
- insertTestDataWithExpiredFlag(sourceConnection, uniqueTableName, 1, 3,
true);
- insertTestDataWithExpiredFlag(sourceConnection, uniqueTableName, 4, 10,
false);
-
- waitForReplication(targetConnection, uniqueTableName, 10);
-
- long sourceCount = TestUtil.getRowCount(sourceConnection, uniqueTableName);
- long targetCount = TestUtil.getRowCount(targetConnection, uniqueTableName);
- assertEquals("Source should see 10 live rows", 10, sourceCount);
- assertEquals("Target should see 10 live rows", 10, targetCount);
-
- // Introduce differences on 2 of the 7 live rows on target
- upsertRowsOnTarget(targetConnection, uniqueTableName, new int[] { 5, 8 },
- new String[] { "MODIFIED_5", "MODIFIED_8" });
-
- // Run sync tool, TTL-expired rows (1-3) should be skipped on both source
and target
- Job job = runSyncTool(uniqueTableName);
- SyncCountersResult counters = getSyncCounters(job);
-
- validateSyncCounters(counters, 7, 7, 5, 2);
- validateMapperCounters(counters, 2, 2);
- }
-
@Test
public void testSyncTableWithConditionalTTLExpiredRowsCompact() throws
Exception {
- // With IS_STRICT_TTL=false
String ddl = "CREATE TABLE IF NOT EXISTS %s (" + "ID INTEGER NOT NULL
PRIMARY KEY, "
- + "NAME VARCHAR(50), NAME_VALUE BIGINT, UPDATED_DATE TIMESTAMP, " +
"EXPIRED BOOLEAN"
- + ") REPLICATION_SCOPE=%d, UPDATE_CACHE_FREQUENCY=0, "
- + "TTL='EXPIRED = TRUE', IS_STRICT_TTL=false " + "SPLIT ON (5, 7, 9)";
+ + "NAME VARCHAR(50), NAME_VALUE BIGINT, UPDATED_DATE TIMESTAMP"
+ + ") REPLICATION_SCOPE=%d, UPDATE_CACHE_FREQUENCY=0, " + "TTL=1 SPLIT ON
(5, 7, 9)";
executeTableCreation(sourceConnection, String.format(ddl, uniqueTableName,
1));
executeTableCreation(targetConnection, String.format(ddl, uniqueTableName,
0));
- // Insert 10 rows on source: rows 1-3 marked as expired, rows 4-10 as live
- insertTestDataWithExpiredFlag(sourceConnection, uniqueTableName, 1, 3,
true);
- insertTestDataWithExpiredFlag(sourceConnection, uniqueTableName, 4, 10,
false);
-
- waitForReplication(targetConnection, uniqueTableName, 10);
+ // Insert 10 rows on source
+ insertTestData(sourceConnection, uniqueTableName, 1, 10);
- long sourceCount = TestUtil.getRowCount(sourceConnection, uniqueTableName);
- long targetCount = TestUtil.getRowCount(targetConnection, uniqueTableName);
- assertEquals("Source should see 10 live rows", 10, sourceCount);
- assertEquals("Target should see 10 live rows", 10, targetCount);
+ // Wait 2 seconds for TTL expiration (TTL=1 second)
+ Thread.sleep(2000);
- // Run sync tool, TTL-expired rows (1-3) should be skipped on both source
and target
+ // Run sync tool - all rows should still be visible before compaction
Job job = runSyncTool(uniqueTableName);
SyncCountersResult counters = getSyncCounters(job);
- validateSyncCounters(counters, 7, 7, 7, 0);
+ validateSyncCounters(counters, 10, 10, 10, 0);
validateMapperCounters(counters, 4, 0);
+ // Flush and major compact target - this will physically remove expired
rows
flushAndMajorCompact(CLUSTERS.getHBaseCluster2(), uniqueTableName);
- long sourceCountPostCompact = TestUtil.getRowCount(sourceConnection,
uniqueTableName);
- long targetCountPostCompact = TestUtil.getRowCount(targetConnection,
uniqueTableName);
- assertEquals("Source should see 10 live rows", 10, sourceCountPostCompact);
- assertEquals("Target should see 7 live rows", 7, targetCountPostCompact);
-
- // We shouldn't see expired rows even with --raw-scan flag
- Job job2 = runSyncTool(uniqueTableName, "--raw-scan");
+ // Run sync tool after compaction - should detect mismatch (source has
rows, target doesn't)
Review Comment:
Shouldn't we handle this filtering in the sync tool coproc? Otherwise, we
will end up seeing a lot of false +ves.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]