This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 81cd5b5e3f Flink: add serializer test for StatisticsOrRecord (#14381)
81cd5b5e3f is described below

commit 81cd5b5e3f71412e1580b7fda19e27fc988845a3
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Oct 21 09:12:00 2025 -0700

    Flink: add serializer test for StatisticsOrRecord (#14381)
---
 .../flink/sink/shuffle/StatisticsOrRecord.java     | 20 ++++++
 .../iceberg/flink/sink/shuffle/Fixtures.java       |  5 +-
 .../shuffle/TestStatisticsOrRecordSerializer.java  | 72 ++++++++++++++++++++++
 3 files changed, 95 insertions(+), 2 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
index bc28df2b0e..e02846af98 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 /**
@@ -109,4 +110,23 @@ public class StatisticsOrRecord implements Serializable {
         .add("record", record)
         .toString();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof StatisticsOrRecord)) {
+      return false;
+    }
+
+    StatisticsOrRecord that = (StatisticsOrRecord) o;
+    return Objects.equal(statistics, that.statistics) && Objects.equal(record, 
that.record);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(statistics, record);
+  }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
index 5910bd6855..2673bc6092 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.flink.sink.shuffle;
 import java.util.Comparator;
 import java.util.Map;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -44,7 +43,7 @@ class Fixtures {
           Types.NestedField.optional(1, "id", Types.StringType.get()),
           Types.NestedField.optional(2, "number", Types.IntegerType.get()));
   public static final RowType ROW_TYPE = RowType.of(new VarCharType(), new 
IntType());
-  public static final TypeSerializer<RowData> ROW_SERIALIZER = new 
RowDataSerializer(ROW_TYPE);
+  public static final RowDataSerializer ROW_SERIALIZER = new 
RowDataSerializer(ROW_TYPE);
   public static final RowDataWrapper ROW_WRAPPER = new 
RowDataWrapper(ROW_TYPE, SCHEMA.asStruct());
   public static final SortOrder SORT_ORDER = 
SortOrder.builderFor(SCHEMA).asc("id").build();
   public static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
@@ -57,6 +56,8 @@ class Fixtures {
       new GlobalStatisticsSerializer(SORT_KEY_SERIALIZER);
   public static final CompletedStatisticsSerializer 
COMPLETED_STATISTICS_SERIALIZER =
       new CompletedStatisticsSerializer(SORT_KEY_SERIALIZER);
+  public static final StatisticsOrRecordSerializer 
STATISTICS_OR_RECORD_SERIALIZER =
+      new StatisticsOrRecordSerializer(GLOBAL_STATISTICS_SERIALIZER, 
ROW_SERIALIZER);
 
   public static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);
   public static final Map<String, SortKey> CHAR_KEYS = createCharKeys();
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestStatisticsOrRecordSerializer.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestStatisticsOrRecordSerializer.java
new file mode 100644
index 0000000000..4ad6d38447
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestStatisticsOrRecordSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.NUM_SUBTASKS;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.ROW_SERIALIZER;
+import static 
org.apache.iceberg.flink.sink.shuffle.Fixtures.SORT_ORDER_COMPARTOR;
+import static 
org.apache.iceberg.flink.sink.shuffle.Fixtures.STATISTICS_OR_RECORD_SERIALIZER;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class TestStatisticsOrRecordSerializer extends 
SerializerTestBase<StatisticsOrRecord> {
+
+  @Override
+  protected TypeSerializer<StatisticsOrRecord> createSerializer() {
+    return STATISTICS_OR_RECORD_SERIALIZER;
+  }
+
+  @Override
+  protected int getLength() {
+    return -1;
+  }
+
+  @Override
+  protected Class<StatisticsOrRecord> getTypeClass() {
+    return StatisticsOrRecord.class;
+  }
+
+  @Override
+  protected StatisticsOrRecord[] getTestData() {
+    return new StatisticsOrRecord[] {
+      // Use BinaryRowData for testing as RowDataSerializer deserializes to 
BinaryRowData
+      StatisticsOrRecord.fromRecord(
+          
ROW_SERIALIZER.toBinaryRow(GenericRowData.of(StringData.fromString("key1"), 
100))),
+      // map global statistics
+      StatisticsOrRecord.fromStatistics(
+          GlobalStatistics.fromMapAssignment(
+              1L,
+              MapAssignment.fromKeyFrequency(
+                  NUM_SUBTASKS,
+                  ImmutableMap.of(CHAR_KEYS.get("a"), 1L, CHAR_KEYS.get("b"), 
2L),
+                  0.0d,
+                  SORT_ORDER_COMPARTOR))),
+      // range bound global statistics
+      StatisticsOrRecord.fromStatistics(
+          GlobalStatistics.fromRangeBounds(
+              2L, new SortKey[] {CHAR_KEYS.get("a"), CHAR_KEYS.get("b")}))
+    };
+  }
+}

Reply via email to