This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 4ddd90cc69eed8178901c44222e6ebbc06bb87a1
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Wed Jan 12 19:38:20 2022 +0800

    [FLINK-25628] Introduce FieldStats
    
    Co-authored-by: tsreaper <tsreape...@gmail.com>
---
 .../flink/table/store/file/stats/FieldStats.java   |  70 +++++++++++++++
 .../file/stats/FieldStatsArraySerializer.java      | 100 +++++++++++++++++++++
 .../store/file/stats/FieldStatsCollector.java      |  82 +++++++++++++++++
 .../file/stats/FieldStatsArraySerializerTest.java  |  62 +++++++++++++
 .../store/file/stats/FieldStatsCollectorTest.java  |  98 ++++++++++++++++++++
 5 files changed, 412 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
new file mode 100644
index 0000000..62843bf
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Statistics for each field. */
+public class FieldStats {
+
+    @Nullable private final Object minValue;
+    @Nullable private final Object maxValue;
+    private final long nullCount;
+
+    public FieldStats(Object minValue, Object maxValue, long nullCount) {
+        this.minValue = minValue;
+        this.maxValue = maxValue;
+        this.nullCount = nullCount;
+    }
+
+    public Object minValue() {
+        return minValue;
+    }
+
+    public Object maxValue() {
+        return maxValue;
+    }
+
+    public long nullCount() {
+        return nullCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof FieldStats)) {
+            return false;
+        }
+        FieldStats that = (FieldStats) o;
+        return Objects.equals(minValue, that.minValue)
+                && Objects.equals(maxValue, that.maxValue)
+                && nullCount == that.nullCount;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(minValue, maxValue, nullCount);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{%s, %s, %d}", minValue, maxValue, nullCount);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
new file mode 100644
index 0000000..6a7cc7f
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Serializer for array of {@link FieldStats}. */
+public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
+
+    private final RowData.FieldGetter[] fieldGetters;
+
+    public FieldStatsArraySerializer(RowType rowType) {
+        super(schema(rowType));
+        this.fieldGetters = 
createFieldGetters(toAllFieldsNullableRowType(rowType));
+    }
+
+    @Override
+    public RowData toRow(FieldStats[] stats) {
+        int rowFieldCount = stats.length;
+        GenericRowData minValues = new GenericRowData(rowFieldCount);
+        GenericRowData maxValues = new GenericRowData(rowFieldCount);
+        long[] nullCounts = new long[rowFieldCount];
+        for (int i = 0; i < rowFieldCount; i++) {
+            minValues.setField(i, stats[i].minValue());
+            maxValues.setField(i, stats[i].maxValue());
+            nullCounts[i] = stats[i].nullCount();
+        }
+        return GenericRowData.of(minValues, maxValues, new 
GenericArrayData(nullCounts));
+    }
+
+    @Override
+    public FieldStats[] fromRow(RowData row) {
+        int rowFieldCount = fieldGetters.length;
+        RowData minValues = row.getRow(0, rowFieldCount);
+        RowData maxValues = row.getRow(1, rowFieldCount);
+        long[] nullValues = row.getArray(2).toLongArray();
+
+        FieldStats[] stats = new FieldStats[rowFieldCount];
+        for (int i = 0; i < rowFieldCount; i++) {
+            stats[i] =
+                    new FieldStats(
+                            fieldGetters[i].getFieldOrNull(minValues),
+                            fieldGetters[i].getFieldOrNull(maxValues),
+                            nullValues[i]);
+        }
+        return stats;
+    }
+
+    public static RowType schema(RowType rowType) {
+        rowType = toAllFieldsNullableRowType(rowType);
+        List<RowType.RowField> fields = new ArrayList<>();
+        fields.add(new RowType.RowField("_MIN_VALUES", rowType));
+        fields.add(new RowType.RowField("_MAX_VALUES", rowType));
+        fields.add(new RowType.RowField("_NULL_COUNTS", new ArrayType(new 
BigIntType(false))));
+        return new RowType(fields);
+    }
+
+    public static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), 
i))
+                .toArray(RowData.FieldGetter[]::new);
+    }
+
+    private static RowType toAllFieldsNullableRowType(RowType rowType) {
+        // as stated in SstFile.RollingFile#finish, field stats are not 
collected currently so
+        // min/max values are all nulls
+        return RowType.of(
+                rowType.getFields().stream()
+                        .map(f -> f.getType().copy(true))
+                        .toArray(LogicalType[]::new),
+                rowType.getFieldNames().toArray(new String[0]));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
new file mode 100644
index 0000000..d7fa281
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+/** Collector to extract statistics of each fields from a series of records. */
+public class FieldStatsCollector {
+
+    private final Object[] minValues;
+    private final Object[] maxValues;
+    private final long[] nullCounts;
+
+    private final RowData.FieldGetter[] fieldGetters;
+
+    public FieldStatsCollector(RowType rowType) {
+        int numFields = rowType.getFieldCount();
+        this.minValues = new Object[numFields];
+        this.maxValues = new Object[numFields];
+        this.nullCounts = new long[numFields];
+        this.fieldGetters = 
FieldStatsArraySerializer.createFieldGetters(rowType);
+    }
+
+    /**
+     * Update the statistics with a new row data.
+     *
+     * <p><b>IMPORTANT</b>: Fields of this row should NOT be reused, as 
they're directly stored in
+     * the collector.
+     */
+    public void collect(RowData row) {
+        Preconditions.checkArgument(
+                fieldGetters.length == row.getArity(),
+                "Expecting row data with %d fields but found row data with %d 
fields",
+                fieldGetters.length,
+                row.getArity());
+        for (int i = 0; i < row.getArity(); i++) {
+            Object obj = fieldGetters[i].getFieldOrNull(row);
+            if (obj == null) {
+                nullCounts[i]++;
+                continue;
+            }
+
+            // TODO use comparator for not comparable types and extract this 
logic to a util class
+            if (!(obj instanceof Comparable)) {
+                continue;
+            }
+            Comparable<Object> c = (Comparable<Object>) obj;
+            if (minValues[i] == null || c.compareTo(minValues[i]) < 0) {
+                minValues[i] = c;
+            }
+            if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) {
+                maxValues[i] = c;
+            }
+        }
+    }
+
+    public FieldStats[] extract() {
+        FieldStats[] stats = new FieldStats[fieldGetters.length];
+        for (int i = 0; i < stats.length; i++) {
+            stats[i] = new FieldStats(minValues[i], maxValues[i], 
nullCounts[i]);
+        }
+        return stats;
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
new file mode 100644
index 0000000..520be75
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FieldStatsArraySerializer}. */
+public class FieldStatsArraySerializerTest extends 
ObjectSerializerTestBase<FieldStats[]> {
+
+    public TestKeyValueGenerator gen = new TestKeyValueGenerator();
+
+    @Override
+    protected ObjectSerializer<FieldStats[]> serializer() {
+        return new FieldStatsArraySerializer(TestKeyValueGenerator.ROW_TYPE);
+    }
+
+    @Override
+    protected FieldStats[] object() {
+        FieldStatsCollector collector = new 
FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+        for (int i = 0; i < 10; i++) {
+            collector.collect(gen.next().value());
+        }
+        FieldStats[] result = collector.extract();
+
+        // as stated in SstFile.RollingFile#finish, field stats are not 
collected currently so
+        // min/max values are all nulls
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numFieldsNotCollected = random.nextInt(result.length + 1);
+        for (int i = 0; i < numFieldsNotCollected; i++) {
+            result[random.nextInt(result.length)] = new FieldStats(null, null, 
0);
+        }
+
+        return result;
+    }
+
+    @Override
+    protected void checkResult(FieldStats[] expected, FieldStats[] actual) {
+        assertThat(actual).isEqualTo(expected);
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
new file mode 100644
index 0000000..36630f4
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FieldStatsCollector}. */
+public class FieldStatsCollectorTest {
+
+    @Test
+    public void testCollect() {
+        RowType rowType =
+                RowType.of(new IntType(), new VarCharType(10), new 
ArrayType(new IntType()));
+        FieldStatsCollector collector = new FieldStatsCollector(rowType);
+
+        collector.collect(
+                GenericRowData.of(
+                        1,
+                        StringData.fromString("Flink"),
+                        new GenericArrayData(new int[] {1, 10})));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 1, 0),
+                            new FieldStats(
+                                    StringData.fromString("Flink"),
+                                    StringData.fromString("Flink"),
+                                    0),
+                            new FieldStats(null, null, 0)
+                        });
+
+        collector.collect(GenericRowData.of(3, null, new GenericArrayData(new 
int[] {3, 30})));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 3, 0),
+                            new FieldStats(
+                                    StringData.fromString("Flink"),
+                                    StringData.fromString("Flink"),
+                                    1),
+                            new FieldStats(null, null, 0)
+                        });
+
+        collector.collect(
+                GenericRowData.of(
+                        null,
+                        StringData.fromString("Apache"),
+                        new GenericArrayData(new int[] {2, 20})));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 3, 1),
+                            new FieldStats(
+                                    StringData.fromString("Apache"),
+                                    StringData.fromString("Flink"),
+                                    1),
+                            new FieldStats(null, null, 0)
+                        });
+
+        collector.collect(GenericRowData.of(2, StringData.fromString("Batch"), 
null));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 3, 1),
+                            new FieldStats(
+                                    StringData.fromString("Apache"),
+                                    StringData.fromString("Flink"),
+                                    1),
+                            new FieldStats(null, null, 1)
+                        });
+    }
+}

Reply via email to