Guosmilesmile opened a new pull request, #11662:
URL: https://github.com/apache/iceberg/pull/11662
When configuring the distribution mode to RANGE, if the partition field in
the data contains null values, it will cause the SortKey serialization to fail,
resulting in the job continuously restarting.
If we set the partition as `bucket(5, name)`, and some data has a null value
for `name`, then the `DataStatisticsOperator` will throw an error when
serializing the statistics and sending them to the coordinator.
In SortKeySerializer.serialize
```
public void serialize(SortKey record, DataOutputView target) throws
IOException {
for (int i = 0; i < size; ++i) {
int fieldId = transformedFields[i].fieldId();
Type.TypeID typeId = transformedFields[i].type().typeId();
switch (typeId) {
case BOOLEAN:
target.writeBoolean(record.get(i, Boolean.class));
break;
case INTEGER:
case DATE:
target.writeInt(record.get(i, Integer.class));
break;
...
}
```
Error log:
```
Caused by: org.apache.flink.util.SerializedThrowable:
java.lang.NullPointerException
at
org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:133)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:49)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.flink.api.common.typeutils.base.MapSerializer.serialize(MapSerializer.java:136)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:113)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:38)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.iceberg.flink.sink.shuffle.StatisticsUtil.serializeDataStatistics(StatisticsUtil.java:46)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.iceberg.flink.sink.shuffle.StatisticsEvent.createTaskStatisticsEvent(StatisticsEvent.java:51)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperator.snapshotState(DataStatisticsOperator.java:227)
~[flink-onejar-4.0.0.jar:?]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:234)
~[flink-dist-1.19.1.jar:1.19.1]
```
We think If a field is set as a partition and a transformation function is
also applied, the scenario will inevitably occur when the field contains null
values. I believe that during the statistics, we can ignore the data with null
values in the relevant partition field. If the sample size is small, it should
not affect the job's execution. If the sample size is large, using the default
round strategy would also be appropriate.
This PR mainly traverses the contents of the partition field and skips the
statistics collection for any parts that contain null values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]