fengjiajie opened a new pull request, #10695:
URL: https://github.com/apache/iceberg/pull/10695
In my use case with Iceberg 1.3, I have a Flink `function-1` that outputs a
`DataStream<DataFile>`, which is then processed by the next function. The
simplified code for `function-1` is as follows:
```java
// Inside function-1:
Map<Integer, Long> columnSizes = new HashMap<>();
columnSizes.put(1, 234L);
DataFile dataFile = DataFiles.builder(icebergTable.spec())
.withMetrics(new Metrics(123L, columnSizes, ...))
...
.build();
// Move file to new path, then rebuild DataFile
DataFile newDataFile = DataFiles.builder(icebergTable.spec())
.copy(dataFile)
.withPath("file:///new_path")
.build();
```
If I return `dataFile`, Flink's Kryo framework can deserialize it correctly
in the next function. However, if I return `newDataFile` (reconstructed with
`copy`), Kryo fails with the following exception:
```
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
...
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
... 4 more
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
columnSizes (org.apache.iceberg.GenericDataFile)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
...
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1459)
...
```
This issue arises in Iceberg 1.15 but not in 1.13. The root cause lies in
the `toReadableMap` method of `org.apache.iceberg.BaseFile`:
```java
// Iceberg 1.13:
private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
return map instanceof SerializableMap ?
((SerializableMap)map).immutableMap() : map;
}
// Iceberg 1.15:
private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
if (map == null) {
return null;
} else if (map instanceof SerializableMap) {
return ((SerializableMap<K, V>) map).immutableMap();
} else {
return Collections.unmodifiableMap(map);
}
}
```
In Iceberg 1.15, `toReadableMap` wraps the map with
`Collections.unmodifiableMap`, resulting in an `UnsupportedOperationException`
during deserialization. While using `unmodifiableMap` seems correct, the `copy`
operation might need to reconstruct these maps as regular mutable maps to avoid
this issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]