stevenzwu commented on a change in pull request #2989:
URL: https://github.com/apache/iceberg/pull/2989#discussion_r697798211
##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -170,114 +171,121 @@ public void testOverwriteTable() throws Exception {
public void testReplacePartitions() throws Exception {
Assume.assumeFalse("Flink unbounded streaming does not support overwrite
operation", isStreamingJob);
String tableName = "test_partition";
-
sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH
('write.format.default'='%s')",
tableName, format.name());
- Table partitionedTable =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-
- sql("INSERT INTO %s SELECT 1, 'a'", tableName);
- sql("INSERT INTO %s SELECT 2, 'b'", tableName);
- sql("INSERT INTO %s SELECT 3, 'c'", tableName);
-
- SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
- SimpleDataUtil.createRecord(1, "a"),
- SimpleDataUtil.createRecord(2, "b"),
- SimpleDataUtil.createRecord(3, "c")
- ));
-
- sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
- sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);
-
- SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
- SimpleDataUtil.createRecord(5, "a"),
- SimpleDataUtil.createRecord(4, "b"),
- SimpleDataUtil.createRecord(3, "c")
- ));
-
- sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);
-
- SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
- SimpleDataUtil.createRecord(6, "a"),
- SimpleDataUtil.createRecord(4, "b"),
- SimpleDataUtil.createRecord(3, "c")
- ));
-
- sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ try {
+ Table partitionedTable =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+
+ sql("INSERT INTO %s SELECT 1, 'a'", tableName);
+ sql("INSERT INTO %s SELECT 2, 'b'", tableName);
+ sql("INSERT INTO %s SELECT 3, 'c'", tableName);
+
+ SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "a"),
+ SimpleDataUtil.createRecord(2, "b"),
+ SimpleDataUtil.createRecord(3, "c")
+ ));
+
+ sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
+ sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);
+
+ SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+ SimpleDataUtil.createRecord(5, "a"),
+ SimpleDataUtil.createRecord(4, "b"),
+ SimpleDataUtil.createRecord(3, "c")
+ ));
+
+ sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);
+
+ SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+ SimpleDataUtil.createRecord(6, "a"),
+ SimpleDataUtil.createRecord(4, "b"),
+ SimpleDataUtil.createRecord(3, "c")
+ ));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
}
@Test
public void testInsertIntoPartition() throws Exception {
String tableName = "test_insert_into_partition";
-
sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH
('write.format.default'='%s')",
tableName, format.name());
- Table partitionedTable =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-
- // Full partition.
- sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
- sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
- sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);
-
- SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
- SimpleDataUtil.createRecord(1, "a"),
- SimpleDataUtil.createRecord(2, "a"),
- SimpleDataUtil.createRecord(3, "b")
- ));
-
- // Partial partition.
- sql("INSERT INTO %s SELECT 4, 'c'", tableName);
- sql("INSERT INTO %s SELECT 5, 'd'", tableName);
-
- SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
- SimpleDataUtil.createRecord(1, "a"),
- SimpleDataUtil.createRecord(2, "a"),
- SimpleDataUtil.createRecord(3, "b"),
- SimpleDataUtil.createRecord(4, "c"),
- SimpleDataUtil.createRecord(5, "d")
- ));
-
- sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ try {
+ Table partitionedTable =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+
+ // Full partition.
+ sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
+ sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
+ sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);
+
+ SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "a"),
+ SimpleDataUtil.createRecord(2, "a"),
+ SimpleDataUtil.createRecord(3, "b")
+ ));
+
+ // Partial partition.
+ sql("INSERT INTO %s SELECT 4, 'c'", tableName);
+ sql("INSERT INTO %s SELECT 5, 'd'", tableName);
+
+ SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "a"),
+ SimpleDataUtil.createRecord(2, "a"),
+ SimpleDataUtil.createRecord(3, "b"),
+ SimpleDataUtil.createRecord(4, "c"),
+ SimpleDataUtil.createRecord(5, "d")
+ ));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
}
@Test
public void testHashDistributeMode() throws Exception {
String tableName = "test_hash_distribution_mode";
-
Map<String, String> tableProps = ImmutableMap.of(
"write.format.default", format.name(),
TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName()
);
sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableProps));
- // Insert data set.
- sql("INSERT INTO %s VALUES " +
- "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
- "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
- "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
-
- Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
- SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
- SimpleDataUtil.createRecord(1, "aaa"),
- SimpleDataUtil.createRecord(1, "bbb"),
- SimpleDataUtil.createRecord(1, "ccc"),
- SimpleDataUtil.createRecord(2, "aaa"),
- SimpleDataUtil.createRecord(2, "bbb"),
- SimpleDataUtil.createRecord(2, "ccc"),
- SimpleDataUtil.createRecord(3, "aaa"),
- SimpleDataUtil.createRecord(3, "bbb"),
- SimpleDataUtil.createRecord(3, "ccc")
- ));
-
- Assert.assertEquals("There should be only 1 data file in partition 'aaa'",
1,
- SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
"aaa")).size());
- Assert.assertEquals("There should be only 1 data file in partition 'bbb'",
1,
- SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
"bbb")).size());
- Assert.assertEquals("There should be only 1 data file in partition 'ccc'",
1,
- SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
"ccc")).size());
-
- sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ try {
+ // Insert data set.
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
+ "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
+ "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+
+ Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+ SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
+ SimpleDataUtil.createRecord(1, "aaa"),
+ SimpleDataUtil.createRecord(1, "bbb"),
+ SimpleDataUtil.createRecord(1, "ccc"),
+ SimpleDataUtil.createRecord(2, "aaa"),
+ SimpleDataUtil.createRecord(2, "bbb"),
+ SimpleDataUtil.createRecord(2, "ccc"),
+ SimpleDataUtil.createRecord(3, "aaa"),
+ SimpleDataUtil.createRecord(3, "bbb"),
+ SimpleDataUtil.createRecord(3, "ccc")
+ ));
+
+ // Sometimes we will have more than one checkpoint if we pass the auto
checkpoint interval
Review comment:
maybe expand the comments a little more to provide more context to
reader.
```
// As a result, we may produce multiple snapshots. So we need to assert that
we only produces 1 file for every partition per snapshot
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]