simonykq commented on issue #10710:
URL: https://github.com/apache/iceberg/issues/10710#issuecomment-2457942633
Btw, I found a way to get this to work (without enabling generic types, but
still use kyro to serialize the write result ender the hood).
First create a class called `WriteResultTypeInformation`:
```
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.iceberg.io.WriteResult;
import java.lang.reflect.Type;
import java.util.Map;
public class WriteResultTypeInformation extends TypeInfoFactory<WriteResult>
{
@Override
public TypeInformation<WriteResult> createTypeInfo(Type t,
Map<String, TypeInformation<?>> genericParameters) {
return new TypeInformation<>() {
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return false;
}
@Override
public int getArity() {
return 3;
}
@Override
public int getTotalFields() {
return 3;
}
@Override
public Class<WriteResult> getTypeClass() {
return WriteResult.class;
}
@Override
public boolean isKeyType() {
return false;
}
@Override
public TypeSerializer<WriteResult>
createSerializer(ExecutionConfig config) {
return new KryoSerializer<>(this.getTypeClass(), config);
}
@Override
public String toString() {
return null;
}
@Override
public boolean equals(Object obj) {
return false;
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean canEqual(Object obj) {
return false;
}
};
}
}
```
and then in the config, put:
```
pipeline.serialization-config:
- org.apache.iceberg.io.WriteResult: {type: typeinfo, class:
org.apache.iceberg.io.WriteResultSerializer}
```
or programatically:
```
Configuration config = new Configuration();
config.set(PipelineOptions.SERIALIZATION_CONFIG,
List.of("org.apache.iceberg.io.WriteResult: {type: typeinfo,
class: org.apache.iceberg.io.WriteResultSerializer}"));
see = StreamExecutionEnvironment.getExecutionEnvironment(config);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]