This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 283b3b81d [client] Add result verification for the typed partition IT 
cases (#1714)
283b3b81d is described below

commit 283b3b81d9eb30ebe871952a3c0a6ef1b16761b3
Author: Jark Wu <[email protected]>
AuthorDate: Wed Sep 17 15:51:26 2025 +0800

    [client] Add result verification for the typed partition IT cases (#1714)
    
    * [client] Add result verification for the typed partition IT cases
    
    * fix java8
---
 ...TCase.java => TypedPartitionedTableITCase.java} | 68 ++++++++--------------
 .../{types => utils}/PartitionNameConverters.java  |  4 +-
 .../org/apache/fluss/utils/PartitionUtils.java     |  9 ++-
 3 files changed, 30 insertions(+), 51 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/TypedPartitionedTableITCase.java
similarity index 78%
rename from 
fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
rename to 
fluss-client/src/test/java/org/apache/fluss/client/table/TypedPartitionedTableITCase.java
index 230d0ee52..f6132e1f9 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/TypedPartitionedTableITCase.java
@@ -48,24 +48,21 @@ import org.apache.fluss.types.TinyIntType;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT case for Fluss partitioned table supporting partition key of different 
types. */
-class NewPartitionedTableITCase extends ClientToServerITCaseBase {
-    Schema.Builder schemaBuilder =
+class TypedPartitionedTableITCase extends ClientToServerITCaseBase {
+    private static final Schema.Builder schemaBuilder =
             Schema.newBuilder()
                     .column("a", new StringType())
                     .column("char", new CharType())
-                    .column("binary", new BinaryType())
+                    .column("binary", new BinaryType(6))
                     .column("boolean", new BooleanType())
                     .column("bytes", new BytesType())
                     .column("tinyInt", new TinyIntType())
@@ -75,12 +72,12 @@ class NewPartitionedTableITCase extends 
ClientToServerITCaseBase {
                     .column("date", new DateType())
                     .column("float", new FloatType())
                     .column("double", new DoubleType())
-                    .column("time", new TimeType())
+                    .column("time", new TimeType(3))
                     .column("timeStampNTZ", new TimestampType())
                     .column("timeStampLTZ", new LocalZonedTimestampType());
 
-    Schema schema = schemaBuilder.build();
-    DataTypeRoot[] allPartitionKeyTypes =
+    private static final Schema schema = schemaBuilder.build();
+    private static final DataTypeRoot[] allPartitionKeyTypes =
             new DataTypeRoot[] {
                 DataTypeRoot.STRING,
                 DataTypeRoot.CHAR,
@@ -99,7 +96,7 @@ class NewPartitionedTableITCase extends 
ClientToServerITCaseBase {
                 DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
             };
 
-    Object[] allPartitionKeyValues =
+    private static final Object[] allPartitionKeyValues =
             new Object[] {
                 BinaryString.fromString("a"),
                 BinaryString.fromString("F"),
@@ -118,7 +115,7 @@ class NewPartitionedTableITCase extends 
ClientToServerITCaseBase {
                 TimestampLtz.fromEpochMillis(1748662955428L) // 
TIMESTAMP_WITH_LOCAL_TIME_ZONE
             };
 
-    Schema.Column[] extraColumn =
+    private static final Schema.Column[] extraColumn =
             new Schema.Column[] {
                 new Schema.Column("a", new StringType()),
                 new Schema.Column("char", new CharType()),
@@ -137,7 +134,7 @@ class NewPartitionedTableITCase extends 
ClientToServerITCaseBase {
                 new Schema.Column("timeStampLTZ", new 
LocalZonedTimestampType())
             };
 
-    List<String> result =
+    private static final List<String> result =
             Arrays.asList(
                     "a",
                     "F",
@@ -183,52 +180,35 @@ class NewPartitionedTableITCase extends 
ClientToServerITCaseBase {
 
     @Test
     public void testMultipleTypedPartitionedTable() throws Exception {
-
         for (int i = 0; i < allPartitionKeyTypes.length; i++) {
             String partitionKey = extraColumn[i].getName();
             TablePath tablePath =
-                    TablePath.of("test_part_db_" + i, 
"test_static_partitioned_pk_table_" + i);
+                    TablePath.of("fluss", "test_static_partitioned_pk_table_" 
+ partitionKey);
             createPartitionedTable(tablePath, partitionKey);
-            String partitionValue =
-                    convertValueOfType(allPartitionKeyValues[i], 
allPartitionKeyTypes[i]);
-
-            admin.createPartition(tablePath, newPartitionSpec(partitionKey, 
partitionValue), true)
-                    .get();
-
-            Map<String, Long> partitionIdByNames =
-                    
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 1);
-
-            List<PartitionInfo> partitionInfos = 
admin.listPartitionInfos(tablePath).get();
-            List<String> expectedPartitions = new 
ArrayList<>(partitionIdByNames.keySet());
-            assertThat(
-                            partitionInfos.stream()
-                                    .map(PartitionInfo::getPartitionName)
-                                    .collect(Collectors.toList()))
-                    .containsExactlyInAnyOrderElementsOf(expectedPartitions);
 
+            // append a record to the table will dynamically create the 
corresponding partition
             Table table = conn.getTable(tablePath);
             AppendWriter appendWriter = table.newAppend().createWriter();
-            Map<Long, List<InternalRow>> expectPartitionAppendRows = new 
HashMap<>();
-            for (String partition : partitionIdByNames.keySet()) {
-                for (int j = 0; j < allPartitionKeyValues.length; j++) {
-                    InternalRow row = row(allPartitionKeyValues);
-                    appendWriter.append(row);
-                    expectPartitionAppendRows
-                            .computeIfAbsent(
-                                    partitionIdByNames.get(partition), k -> 
new ArrayList<>())
-                            .add(row);
-                }
-            }
+            InternalRow row = row(allPartitionKeyValues);
+            appendWriter.append(row);
             appendWriter.flush();
 
-            
assertThat(admin.listPartitionInfos(tablePath).get().get(0).getPartitionName())
-                    .isEqualTo(result.get(i));
+            List<PartitionInfo> actualPartitions = 
admin.listPartitionInfos(tablePath).get();
+            assertThat(actualPartitions).hasSize(1);
+            PartitionInfo actualPartition = actualPartitions.get(0);
+            
assertThat(actualPartition.getPartitionName()).isEqualTo(result.get(i));
+
+            Map<Long, List<InternalRow>> expectPartitionAppendRows = new 
HashMap<>();
+            expectPartitionAppendRows.put(actualPartition.getPartitionId(), 
Arrays.asList(row));
+            // assert result
+            verifyPartitionLogs(table, schema.getRowType(), 
expectPartitionAppendRows);
+            admin.dropTable(tablePath, true).get();
         }
     }
 
     private void createPartitionedTable(TablePath tablePath, String 
partitionKey) throws Exception {
         TableDescriptor partitionTableDescriptor =
                 
TableDescriptor.builder().schema(schema).partitionedBy(partitionKey).build();
-        createTable(tablePath, partitionTableDescriptor, false);
+        createTable(tablePath, partitionTableDescriptor, true);
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
similarity index 98%
rename from 
fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
rename to 
fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
index 414465003..e12a68d21 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.types;
+package org.apache.fluss.utils;
 
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 7ba2197fa..d853b7596 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -27,7 +27,6 @@ import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataTypeRoot;
-import org.apache.fluss.types.PartitionNameConverters;
 
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
@@ -181,6 +180,10 @@ public class PartitionUtils {
                 Integer intValue = (Integer) value;
                 stringPartitionKey = intValue.toString();
                 break;
+            case BIGINT:
+                Long bigIntValue = (Long) value;
+                stringPartitionKey = bigIntValue.toString();
+                break;
             case DATE:
                 Integer dateValue = (Integer) value;
                 stringPartitionKey = 
PartitionNameConverters.dayToString(dateValue);
@@ -205,10 +208,6 @@ public class PartitionUtils {
                 TimestampNtz timeStampNTZValue = (TimestampNtz) value;
                 stringPartitionKey = 
PartitionNameConverters.timestampToString(timeStampNTZValue);
                 break;
-            case BIGINT:
-                Long bigIntValue = (Long) value;
-                stringPartitionKey = bigIntValue.toString();
-                break;
             default:
                 throw new IllegalArgumentException("Unsupported DataTypeRoot: 
" + type);
         }

Reply via email to