This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 283b3b81d [client] Add result verification for the typed partition IT
cases (#1714)
283b3b81d is described below
commit 283b3b81d9eb30ebe871952a3c0a6ef1b16761b3
Author: Jark Wu <[email protected]>
AuthorDate: Wed Sep 17 15:51:26 2025 +0800
[client] Add result verification for the typed partition IT cases (#1714)
* [client] Add result verification for the typed partition IT cases
* fix java8
---
...TCase.java => TypedPartitionedTableITCase.java} | 68 ++++++++--------------
.../{types => utils}/PartitionNameConverters.java | 4 +-
.../org/apache/fluss/utils/PartitionUtils.java | 9 ++-
3 files changed, 30 insertions(+), 51 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/TypedPartitionedTableITCase.java
similarity index 78%
rename from
fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
rename to
fluss-client/src/test/java/org/apache/fluss/client/table/TypedPartitionedTableITCase.java
index 230d0ee52..f6132e1f9 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/TypedPartitionedTableITCase.java
@@ -48,24 +48,21 @@ import org.apache.fluss.types.TinyIntType;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
import static org.assertj.core.api.Assertions.assertThat;
/** IT case for Fluss partitioned table supporting partition key of different
types. */
-class NewPartitionedTableITCase extends ClientToServerITCaseBase {
- Schema.Builder schemaBuilder =
+class TypedPartitionedTableITCase extends ClientToServerITCaseBase {
+ private static final Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("a", new StringType())
.column("char", new CharType())
- .column("binary", new BinaryType())
+ .column("binary", new BinaryType(6))
.column("boolean", new BooleanType())
.column("bytes", new BytesType())
.column("tinyInt", new TinyIntType())
@@ -75,12 +72,12 @@ class NewPartitionedTableITCase extends
ClientToServerITCaseBase {
.column("date", new DateType())
.column("float", new FloatType())
.column("double", new DoubleType())
- .column("time", new TimeType())
+ .column("time", new TimeType(3))
.column("timeStampNTZ", new TimestampType())
.column("timeStampLTZ", new LocalZonedTimestampType());
- Schema schema = schemaBuilder.build();
- DataTypeRoot[] allPartitionKeyTypes =
+ private static final Schema schema = schemaBuilder.build();
+ private static final DataTypeRoot[] allPartitionKeyTypes =
new DataTypeRoot[] {
DataTypeRoot.STRING,
DataTypeRoot.CHAR,
@@ -99,7 +96,7 @@ class NewPartitionedTableITCase extends
ClientToServerITCaseBase {
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
};
- Object[] allPartitionKeyValues =
+ private static final Object[] allPartitionKeyValues =
new Object[] {
BinaryString.fromString("a"),
BinaryString.fromString("F"),
@@ -118,7 +115,7 @@ class NewPartitionedTableITCase extends
ClientToServerITCaseBase {
TimestampLtz.fromEpochMillis(1748662955428L) //
TIMESTAMP_WITH_LOCAL_TIME_ZONE
};
- Schema.Column[] extraColumn =
+ private static final Schema.Column[] extraColumn =
new Schema.Column[] {
new Schema.Column("a", new StringType()),
new Schema.Column("char", new CharType()),
@@ -137,7 +134,7 @@ class NewPartitionedTableITCase extends
ClientToServerITCaseBase {
new Schema.Column("timeStampLTZ", new
LocalZonedTimestampType())
};
- List<String> result =
+ private static final List<String> result =
Arrays.asList(
"a",
"F",
@@ -183,52 +180,35 @@ class NewPartitionedTableITCase extends
ClientToServerITCaseBase {
@Test
public void testMultipleTypedPartitionedTable() throws Exception {
-
for (int i = 0; i < allPartitionKeyTypes.length; i++) {
String partitionKey = extraColumn[i].getName();
TablePath tablePath =
- TablePath.of("test_part_db_" + i,
"test_static_partitioned_pk_table_" + i);
+ TablePath.of("fluss", "test_static_partitioned_pk_table_"
+ partitionKey);
createPartitionedTable(tablePath, partitionKey);
- String partitionValue =
- convertValueOfType(allPartitionKeyValues[i],
allPartitionKeyTypes[i]);
-
- admin.createPartition(tablePath, newPartitionSpec(partitionKey,
partitionValue), true)
- .get();
-
- Map<String, Long> partitionIdByNames =
-
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 1);
-
- List<PartitionInfo> partitionInfos =
admin.listPartitionInfos(tablePath).get();
- List<String> expectedPartitions = new
ArrayList<>(partitionIdByNames.keySet());
- assertThat(
- partitionInfos.stream()
- .map(PartitionInfo::getPartitionName)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrderElementsOf(expectedPartitions);
+ // append a record to the table will dynamically create the
corresponding partition
Table table = conn.getTable(tablePath);
AppendWriter appendWriter = table.newAppend().createWriter();
- Map<Long, List<InternalRow>> expectPartitionAppendRows = new
HashMap<>();
- for (String partition : partitionIdByNames.keySet()) {
- for (int j = 0; j < allPartitionKeyValues.length; j++) {
- InternalRow row = row(allPartitionKeyValues);
- appendWriter.append(row);
- expectPartitionAppendRows
- .computeIfAbsent(
- partitionIdByNames.get(partition), k ->
new ArrayList<>())
- .add(row);
- }
- }
+ InternalRow row = row(allPartitionKeyValues);
+ appendWriter.append(row);
appendWriter.flush();
-
assertThat(admin.listPartitionInfos(tablePath).get().get(0).getPartitionName())
- .isEqualTo(result.get(i));
+ List<PartitionInfo> actualPartitions =
admin.listPartitionInfos(tablePath).get();
+ assertThat(actualPartitions).hasSize(1);
+ PartitionInfo actualPartition = actualPartitions.get(0);
+
assertThat(actualPartition.getPartitionName()).isEqualTo(result.get(i));
+
+ Map<Long, List<InternalRow>> expectPartitionAppendRows = new
HashMap<>();
+ expectPartitionAppendRows.put(actualPartition.getPartitionId(),
Arrays.asList(row));
+ // assert result
+ verifyPartitionLogs(table, schema.getRowType(),
expectPartitionAppendRows);
+ admin.dropTable(tablePath, true).get();
}
}
private void createPartitionedTable(TablePath tablePath, String
partitionKey) throws Exception {
TableDescriptor partitionTableDescriptor =
TableDescriptor.builder().schema(schema).partitionedBy(partitionKey).build();
- createTable(tablePath, partitionTableDescriptor, false);
+ createTable(tablePath, partitionTableDescriptor, true);
}
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
similarity index 98%
rename from
fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
rename to
fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
index 414465003..e12a68d21 100644
---
a/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
+++
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.types;
+package org.apache.fluss.utils;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 7ba2197fa..d853b7596 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -27,7 +27,6 @@ import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypeRoot;
-import org.apache.fluss.types.PartitionNameConverters;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
@@ -181,6 +180,10 @@ public class PartitionUtils {
Integer intValue = (Integer) value;
stringPartitionKey = intValue.toString();
break;
+ case BIGINT:
+ Long bigIntValue = (Long) value;
+ stringPartitionKey = bigIntValue.toString();
+ break;
case DATE:
Integer dateValue = (Integer) value;
stringPartitionKey =
PartitionNameConverters.dayToString(dateValue);
@@ -205,10 +208,6 @@ public class PartitionUtils {
TimestampNtz timeStampNTZValue = (TimestampNtz) value;
stringPartitionKey =
PartitionNameConverters.timestampToString(timeStampNTZValue);
break;
- case BIGINT:
- Long bigIntValue = (Long) value;
- stringPartitionKey = bigIntValue.toString();
- break;
default:
throw new IllegalArgumentException("Unsupported DataTypeRoot:
" + type);
}