This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 81cd5b5e3f Flink: add serializer test for StatisticsOrRecord (#14381)
81cd5b5e3f is described below
commit 81cd5b5e3f71412e1580b7fda19e27fc988845a3
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Oct 21 09:12:00 2025 -0700
Flink: add serializer test for StatisticsOrRecord (#14381)
---
.../flink/sink/shuffle/StatisticsOrRecord.java | 20 ++++++
.../iceberg/flink/sink/shuffle/Fixtures.java | 5 +-
.../shuffle/TestStatisticsOrRecordSerializer.java | 72 ++++++++++++++++++++++
3 files changed, 95 insertions(+), 2 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
index bc28df2b0e..e02846af98 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsOrRecord.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
@@ -109,4 +110,23 @@ public class StatisticsOrRecord implements Serializable {
.add("record", record)
.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof StatisticsOrRecord)) {
+ return false;
+ }
+
+ StatisticsOrRecord that = (StatisticsOrRecord) o;
+ return Objects.equal(statistics, that.statistics) && Objects.equal(record,
that.record);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(statistics, record);
+ }
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
index 5910bd6855..2673bc6092 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/Fixtures.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.flink.sink.shuffle;
import java.util.Comparator;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -44,7 +43,7 @@ class Fixtures {
Types.NestedField.optional(1, "id", Types.StringType.get()),
Types.NestedField.optional(2, "number", Types.IntegerType.get()));
public static final RowType ROW_TYPE = RowType.of(new VarCharType(), new
IntType());
- public static final TypeSerializer<RowData> ROW_SERIALIZER = new
RowDataSerializer(ROW_TYPE);
+ public static final RowDataSerializer ROW_SERIALIZER = new
RowDataSerializer(ROW_TYPE);
public static final RowDataWrapper ROW_WRAPPER = new
RowDataWrapper(ROW_TYPE, SCHEMA.asStruct());
public static final SortOrder SORT_ORDER =
SortOrder.builderFor(SCHEMA).asc("id").build();
public static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
@@ -57,6 +56,8 @@ class Fixtures {
new GlobalStatisticsSerializer(SORT_KEY_SERIALIZER);
public static final CompletedStatisticsSerializer
COMPLETED_STATISTICS_SERIALIZER =
new CompletedStatisticsSerializer(SORT_KEY_SERIALIZER);
+ public static final StatisticsOrRecordSerializer
STATISTICS_OR_RECORD_SERIALIZER =
+ new StatisticsOrRecordSerializer(GLOBAL_STATISTICS_SERIALIZER,
ROW_SERIALIZER);
public static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);
public static final Map<String, SortKey> CHAR_KEYS = createCharKeys();
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestStatisticsOrRecordSerializer.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestStatisticsOrRecordSerializer.java
new file mode 100644
index 0000000000..4ad6d38447
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestStatisticsOrRecordSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.NUM_SUBTASKS;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.ROW_SERIALIZER;
+import static
org.apache.iceberg.flink.sink.shuffle.Fixtures.SORT_ORDER_COMPARTOR;
+import static
org.apache.iceberg.flink.sink.shuffle.Fixtures.STATISTICS_OR_RECORD_SERIALIZER;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class TestStatisticsOrRecordSerializer extends
SerializerTestBase<StatisticsOrRecord> {
+
+ @Override
+ protected TypeSerializer<StatisticsOrRecord> createSerializer() {
+ return STATISTICS_OR_RECORD_SERIALIZER;
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<StatisticsOrRecord> getTypeClass() {
+ return StatisticsOrRecord.class;
+ }
+
+ @Override
+ protected StatisticsOrRecord[] getTestData() {
+ return new StatisticsOrRecord[] {
+ // Use BinaryRowData for testing as RowDataSerializer deserializes to
BinaryRowData
+ StatisticsOrRecord.fromRecord(
+
ROW_SERIALIZER.toBinaryRow(GenericRowData.of(StringData.fromString("key1"),
100))),
+ // map global statistics
+ StatisticsOrRecord.fromStatistics(
+ GlobalStatistics.fromMapAssignment(
+ 1L,
+ MapAssignment.fromKeyFrequency(
+ NUM_SUBTASKS,
+ ImmutableMap.of(CHAR_KEYS.get("a"), 1L, CHAR_KEYS.get("b"),
2L),
+ 0.0d,
+ SORT_ORDER_COMPARTOR))),
+ // range bound global statistics
+ StatisticsOrRecord.fromStatistics(
+ GlobalStatistics.fromRangeBounds(
+ 2L, new SortKey[] {CHAR_KEYS.get("a"), CHAR_KEYS.get("b")}))
+ };
+ }
+}