This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 4ddd90cc69eed8178901c44222e6ebbc06bb87a1 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Jan 12 19:38:20 2022 +0800 [FLINK-25628] Introduce FieldStats Co-authored-by: tsreaper <tsreape...@gmail.com> --- .../flink/table/store/file/stats/FieldStats.java | 70 +++++++++++++++ .../file/stats/FieldStatsArraySerializer.java | 100 +++++++++++++++++++++ .../store/file/stats/FieldStatsCollector.java | 82 +++++++++++++++++ .../file/stats/FieldStatsArraySerializerTest.java | 62 +++++++++++++ .../store/file/stats/FieldStatsCollectorTest.java | 98 ++++++++++++++++++++ 5 files changed, 412 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java new file mode 100644 index 0000000..62843bf --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.stats; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** Statistics for each field. */ +public class FieldStats { + + @Nullable private final Object minValue; + @Nullable private final Object maxValue; + private final long nullCount; + + public FieldStats(Object minValue, Object maxValue, long nullCount) { + this.minValue = minValue; + this.maxValue = maxValue; + this.nullCount = nullCount; + } + + public Object minValue() { + return minValue; + } + + public Object maxValue() { + return maxValue; + } + + public long nullCount() { + return nullCount; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof FieldStats)) { + return false; + } + FieldStats that = (FieldStats) o; + return Objects.equals(minValue, that.minValue) + && Objects.equals(maxValue, that.maxValue) + && nullCount == that.nullCount; + } + + @Override + public int hashCode() { + return Objects.hash(minValue, maxValue, nullCount); + } + + @Override + public String toString() { + return String.format("{%s, %s, %d}", minValue, maxValue, nullCount); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java new file mode 100644 index 0000000..6a7cc7f --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.stats; + +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.utils.ObjectSerializer; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + +/** Serializer for array of {@link FieldStats}. */ +public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> { + + private final RowData.FieldGetter[] fieldGetters; + + public FieldStatsArraySerializer(RowType rowType) { + super(schema(rowType)); + this.fieldGetters = createFieldGetters(toAllFieldsNullableRowType(rowType)); + } + + @Override + public RowData toRow(FieldStats[] stats) { + int rowFieldCount = stats.length; + GenericRowData minValues = new GenericRowData(rowFieldCount); + GenericRowData maxValues = new GenericRowData(rowFieldCount); + long[] nullCounts = new long[rowFieldCount]; + for (int i = 0; i < rowFieldCount; i++) { + minValues.setField(i, stats[i].minValue()); + maxValues.setField(i, stats[i].maxValue()); + nullCounts[i] = stats[i].nullCount(); + } + return GenericRowData.of(minValues, maxValues, new GenericArrayData(nullCounts)); + } + + @Override + public FieldStats[] fromRow(RowData row) { + int rowFieldCount = fieldGetters.length; + RowData minValues = row.getRow(0, rowFieldCount); + RowData maxValues = row.getRow(1, rowFieldCount); + long[] nullValues = row.getArray(2).toLongArray(); + + FieldStats[] stats = new FieldStats[rowFieldCount]; + for (int i = 0; i < rowFieldCount; i++) { + stats[i] = + new FieldStats( + fieldGetters[i].getFieldOrNull(minValues), + fieldGetters[i].getFieldOrNull(maxValues), + nullValues[i]); + } + return stats; + } + + public static RowType schema(RowType rowType) { + rowType = toAllFieldsNullableRowType(rowType); + List<RowType.RowField> fields = new ArrayList<>(); + fields.add(new RowType.RowField("_MIN_VALUES", rowType)); + fields.add(new RowType.RowField("_MAX_VALUES", rowType)); + fields.add(new RowType.RowField("_NULL_COUNTS", new ArrayType(new BigIntType(false)))); + return new RowType(fields); + } + + public static RowData.FieldGetter[] createFieldGetters(RowType rowType) { + return IntStream.range(0, rowType.getFieldCount()) + .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i)) + .toArray(RowData.FieldGetter[]::new); + } + + private static RowType toAllFieldsNullableRowType(RowType rowType) { + // as stated in SstFile.RollingFile#finish, field stats are not collected currently so + // min/max values are all nulls + return RowType.of( + rowType.getFields().stream() + .map(f -> f.getType().copy(true)) + .toArray(LogicalType[]::new), + rowType.getFieldNames().toArray(new String[0])); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java new file mode 100644 index 0000000..d7fa281 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.stats; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +/** Collector to extract statistics of each fields from a series of records. */ +public class FieldStatsCollector { + + private final Object[] minValues; + private final Object[] maxValues; + private final long[] nullCounts; + + private final RowData.FieldGetter[] fieldGetters; + + public FieldStatsCollector(RowType rowType) { + int numFields = rowType.getFieldCount(); + this.minValues = new Object[numFields]; + this.maxValues = new Object[numFields]; + this.nullCounts = new long[numFields]; + this.fieldGetters = FieldStatsArraySerializer.createFieldGetters(rowType); + } + + /** + * Update the statistics with a new row data. + * + * <p><b>IMPORTANT</b>: Fields of this row should NOT be reused, as they're directly stored in + * the collector. + */ + public void collect(RowData row) { + Preconditions.checkArgument( + fieldGetters.length == row.getArity(), + "Expecting row data with %d fields but found row data with %d fields", + fieldGetters.length, + row.getArity()); + for (int i = 0; i < row.getArity(); i++) { + Object obj = fieldGetters[i].getFieldOrNull(row); + if (obj == null) { + nullCounts[i]++; + continue; + } + + // TODO use comparator for not comparable types and extract this logic to a util class + if (!(obj instanceof Comparable)) { + continue; + } + Comparable<Object> c = (Comparable<Object>) obj; + if (minValues[i] == null || c.compareTo(minValues[i]) < 0) { + minValues[i] = c; + } + if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) { + maxValues[i] = c; + } + } + } + + public FieldStats[] extract() { + FieldStats[] stats = new FieldStats[fieldGetters.length]; + for (int i = 0; i < stats.length; i++) { + stats[i] = new FieldStats(minValues[i], maxValues[i], nullCounts[i]); + } + return stats; + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java new file mode 100644 index 0000000..520be75 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.stats; + +import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.utils.ObjectSerializer; +import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase; + +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FieldStatsArraySerializer}. */ +public class FieldStatsArraySerializerTest extends ObjectSerializerTestBase<FieldStats[]> { + + public TestKeyValueGenerator gen = new TestKeyValueGenerator(); + + @Override + protected ObjectSerializer<FieldStats[]> serializer() { + return new FieldStatsArraySerializer(TestKeyValueGenerator.ROW_TYPE); + } + + @Override + protected FieldStats[] object() { + FieldStatsCollector collector = new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE); + for (int i = 0; i < 10; i++) { + collector.collect(gen.next().value()); + } + FieldStats[] result = collector.extract(); + + // as stated in SstFile.RollingFile#finish, field stats are not collected currently so + // min/max values are all nulls + ThreadLocalRandom random = ThreadLocalRandom.current(); + int numFieldsNotCollected = random.nextInt(result.length + 1); + for (int i = 0; i < numFieldsNotCollected; i++) { + result[random.nextInt(result.length)] = new FieldStats(null, null, 0); + } + + return result; + } + + @Override + protected void checkResult(FieldStats[] expected, FieldStats[] actual) { + assertThat(actual).isEqualTo(expected); + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java new file mode 100644 index 0000000..36630f4 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.stats; + +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FieldStatsCollector}. */ +public class FieldStatsCollectorTest { + + @Test + public void testCollect() { + RowType rowType = + RowType.of(new IntType(), new VarCharType(10), new ArrayType(new IntType())); + FieldStatsCollector collector = new FieldStatsCollector(rowType); + + collector.collect( + GenericRowData.of( + 1, + StringData.fromString("Flink"), + new GenericArrayData(new int[] {1, 10}))); + assertThat(collector.extract()) + .isEqualTo( + new FieldStats[] { + new FieldStats(1, 1, 0), + new FieldStats( + StringData.fromString("Flink"), + StringData.fromString("Flink"), + 0), + new FieldStats(null, null, 0) + }); + + collector.collect(GenericRowData.of(3, null, new GenericArrayData(new int[] {3, 30}))); + assertThat(collector.extract()) + .isEqualTo( + new FieldStats[] { + new FieldStats(1, 3, 0), + new FieldStats( + StringData.fromString("Flink"), + StringData.fromString("Flink"), + 1), + new FieldStats(null, null, 0) + }); + + collector.collect( + GenericRowData.of( + null, + StringData.fromString("Apache"), + new GenericArrayData(new int[] {2, 20}))); + assertThat(collector.extract()) + .isEqualTo( + new FieldStats[] { + new FieldStats(1, 3, 1), + new FieldStats( + StringData.fromString("Apache"), + StringData.fromString("Flink"), + 1), + new FieldStats(null, null, 0) + }); + + collector.collect(GenericRowData.of(2, StringData.fromString("Batch"), null)); + assertThat(collector.extract()) + .isEqualTo( + new FieldStats[] { + new FieldStats(1, 3, 1), + new FieldStats( + StringData.fromString("Apache"), + StringData.fromString("Flink"), + 1), + new FieldStats(null, null, 1) + }); + } +}