Sorry, I realize I posted this to the wrong list, please ignore and I'll post it to the flink-user one.
On Sun, Feb 14, 2021 at 11:41 AM Svend Vanderveken <sv...@kelesia.com> wrote: > Hi all, > > I'm failing to setup an example of wire serialization with Protobuf, could > you help me figure out what I'm doing wrong? > > I'm using a simple protobuf schema: > > ``` > > syntax = "proto3"; > > import "google/protobuf/wrappers.proto"; > option java_multiple_files = true; > > message DemoUserEvent { > Metadata metadata = 1; > oneof payload { > Created created = 10; > Updated updated = 11; > } > > message Created {...} > > message Updated {...} > > ... > > } > > ``` > > > From which I'm generating java from this Gradle plugin: > > > ``` > > plugins { > id "com.google.protobuf" version "0.8.15" > } > > ``` > > > And I'm generating DemoUserEvent instances with Java Iterator looking like > this: > > > ``` > > public class UserEventGenerator implements Iterator<DemoUserEvent>, > Serializable { > transient public final static Faker faker = new Faker(); > ... > @Override public DemoUserEvent next() { > return randomCreatedEvent(); > > } > > ... > > ``` > > > I read those two pieces of documentation: > * > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html > * > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html > > And tried the demo app below: > > ``` > > import com.twitter.chill.protobuf.ProtobufSerializer; > > ... > > public static void main(String[] args) { > final StreamExecutionEnvironment flinkEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, > ProtobufSerializer.class); > flinkEnv.fromCollection(new UserEventGenerator(), > DemoUserEvent.class).print(); > } > > ``` > > But the serialization mechanism still fails to handle my protobuf class: > > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - class live.schema.event.user.v1.DemoUserEvent does not contain a > getter for field payloadCase_ > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - class live.schema.event.user.v1.DemoUserEvent does not contain a > setter for field payloadCase_ > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as > a POJO type because not all fields are valid POJO fields, and must be > processed as GenericType. Please read the Flink documentation on "Data Types > & Serialization" for details of the effect on performance. > > I've also tried this, without success: > > ``` > > flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, > ProtobufSerializer.class); > > ``` > > > I'm using those versions: > > ``` > > ext { > javaVersion = '11' > flinkVersion = '1.12.1' > scalaBinaryVersion = '2.12' > } > > dependencies { > compileOnly > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > implementation ("com.twitter:chill-protobuf:0.9.5") { > exclude group: 'com.esotericsoftware.kryo', module: 'kryo' > } > implementation "com.google.protobuf:protobuf-java:3.14.0" > implementation 'com.github.javafaker:javafaker:1.0.2' > } > > ``` > > > Any idea what I should try next? > > Thanks in advance! > > > > -- Svend Vanderveken Kelesia SPRL - BE 0839 049 010 blog: https://svend.kelesia.com <http://svend.kelesia.com/> Twitter: @sv3ndk <https://twitter.com/sv3ndk>