liudi1184 opened a new issue #2028:
URL: https://github.com/apache/iceberg/issues/2028
I want to use the following method to merge small files with Actions in the
Flink task:
` StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setMaxParallelism(1);
//registerSerializer();
HadoopCatalog hadoopCatalog = HadoopCatalogHolder.getHadoopCatalog();
List<Namespace> namespaces = hadoopCatalog.listNamespaces();
for (Namespace namespace : namespaces) {
List<TableIdentifier> tableIdentifiers =
hadoopCatalog.listTables(namespace);
for (TableIdentifier tableIdentifier : tableIdentifiers) {
logger.info("开始合并{}", tableIdentifier.toString());
Actions.forTable(env,
hadoopCatalog.loadTable(tableIdentifier))
.rewriteDataFiles()
.maxParallelism(1)
.targetSizeInBytes(1024 * 1024 * 128)
.execute();
}
}
env.execute("iceberg文件合并");`
But there are always the following exceptions:
`Caused by: java.io.IOException: Failed to deserialize an element from the
source. If you are using user-defined serialization (Value and Writable types),
check the serialization functions.
Serializer is
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@9ef07cb2
at
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:158)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
nullValueCounts (org.apache.iceberg.GenericDeleteFile)
deletes (org.apache.iceberg.BaseFileScanTask)
fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
tasks (org.apache.iceberg.BaseCombinedScanTask)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:155)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.UnsupportedOperationException`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]