xx789633 commented on code in PR #1658:
URL: https://github.com/apache/fluss/pull/1658#discussion_r2332244449
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java:
##########
@@ -48,6 +54,277 @@ protected static void beforeAll() {
execEnv.enableCheckpointing(1000);
}
+ @Test
+ void testPosDeleteCompaction() throws Exception {
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+ long t1Id = createPkTable(t1, true);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ List<InternalRow> rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 1,
+ 1 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 1);
+
+ rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 2,
+ 2 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 2);
+
+ // add pos-delete
+ rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 3,
+ 3 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}),
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 3,
+ 3 + 400L,
+ 500.1f,
+ 600.0d,
+ "v2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ // one UPDATE_BEFORE and one UPDATE_AFTER
+ assertReplicaStatus(t1Bucket, 5);
+ checkFileStatusInIcebergTable(t1, 3, true);
+
+ // trigger compaction
+ rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 4,
+ 4 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 6);
+ checkFileStatusInIcebergTable(t1, 2, false);
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
+ @Test
+ void testPosDeleteDuringCompaction() throws Exception {
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2");
+ long t1Id = createPkTable(t1, true);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ List<InternalRow> rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 1,
+ 1 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 1);
+
+ rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 2,
+ 2 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 2);
+
+ rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 3,
+ 3 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 3);
+
+ // add pos-delete and trigger compaction
+ rows =
+ Arrays.asList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 4,
+ 4 + 400L,
+ 500.1f,
+ 600.0d,
+ "v1",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}),
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 4,
+ 4 + 400L,
+ 500.1f,
+ 600.0d,
+ "v2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ TypeUtils.castFromString("2023-10-25",
DataTypes.DATE()),
+ TypeUtils.castFromString("09:30:00.0",
DataTypes.TIME()),
+ BinaryString.fromString("abc"),
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9,
10}));
+ writeRows(t1, rows, false);
+ assertReplicaStatus(t1Bucket, 6);
+ // rewritten files should fail to commit due to conflict, add
check here
Review Comment:
Agree. I left a note here just to remind you to add more checks. Should we
also check the latest snapshot version is correct? And the rewritten files have
been deleted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]