mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1871472107
##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java:
##########
@@ -73,12 +73,25 @@ static byte[] serializeCompletedStatistics(
}
static CompletedStatistics deserializeCompletedStatistics(
- byte[] bytes, TypeSerializer<CompletedStatistics> statisticsSerializer) {
+ byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
try {
DataInputDeserializer input = new DataInputDeserializer(bytes);
- return statisticsSerializer.deserialize(input);
- } catch (IOException e) {
- throw new UncheckedIOException("Fail to deserialize aggregated
statistics", e);
+ CompletedStatistics completedStatistics =
statisticsSerializer.deserialize(input);
+ if (!completedStatistics.isValid()) {
+ throw new RuntimeException("Fail to deserialize aggregated
statistics,change to v1");
+ }
+
+ return completedStatistics;
+ } catch (Exception e) {
+ try {
+ statisticsSerializer.changeSortKeySerializerVersion(1);
+ DataInputDeserializer input = new DataInputDeserializer(bytes);
+ CompletedStatistics deserialize =
statisticsSerializer.deserialize(input);
+ statisticsSerializer.changeSortKeySerializerVersionLatest();
+ return deserialize;
+ } catch (IOException ioException) {
+ throw new UncheckedIOException("Fail to deserialize aggregated
statistics", ioException);
+ }
Review Comment:
We could add a comment here, explaining why switching versions was necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]