Sorry, I realize I posted this to the wrong list, please ignore and I'll
post it to the flink-user one.

On Sun, Feb 14, 2021 at 11:41 AM Svend Vanderveken <sv...@kelesia.com>
wrote:

> Hi all,
>
> I'm failing to setup an example of wire serialization with Protobuf, could
> you help me figure out what I'm doing wrong?
>
> I'm using a simple protobuf schema:
>
> ```
>
> syntax = "proto3";
>
> import "google/protobuf/wrappers.proto";
> option java_multiple_files = true;
>
> message DemoUserEvent {
>   Metadata metadata = 1;
>   oneof payload {
>     Created created = 10;
>     Updated updated = 11;
>   }
>
>   message Created {...}
>
>   message Updated {...}
>
>   ...
>
> }
>
> ```
>
>
> From which I'm generating java from this Gradle plugin:
>
>
> ```
>
> plugins {
>     id "com.google.protobuf" version "0.8.15"
> }
>
> ```
>
>
> And I'm generating DemoUserEvent instances with Java Iterator looking like 
> this:
>
>
> ```
>
> public class UserEventGenerator implements Iterator<DemoUserEvent>, 
> Serializable {
>     transient public final static Faker faker = new Faker();
>     ...
>     @Override public DemoUserEvent next() {
>         return randomCreatedEvent();
>
>      }
>
>      ...
>
> ```
>
>
> I read those two pieces of documentation:
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> And tried the demo app below:
>
> ```
>
> import com.twitter.chill.protobuf.ProtobufSerializer;
>
> ...
>
> public static void main(String[] args) {
>     final StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, 
> ProtobufSerializer.class);
>     flinkEnv.fromCollection(new UserEventGenerator(), 
> DemoUserEvent.class).print();
> }
>
> ```
>
> But the serialization mechanism still fails to handle my protobuf class:
>
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
> getter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
> setter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as 
> a POJO type because not all fields are valid POJO fields, and must be 
> processed as GenericType. Please read the Flink documentation on "Data Types 
> & Serialization" for details of the effect on performance.
>
> I've also tried this, without success:
>
> ```
>
> flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, 
> ProtobufSerializer.class);
>
> ```
>
>
> I'm using those versions:
>
> ```
>
> ext {
>     javaVersion = '11'
>     flinkVersion = '1.12.1'
>     scalaBinaryVersion = '2.12'
> }
>
> dependencies {
>     compileOnly 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>     implementation ("com.twitter:chill-protobuf:0.9.5") {
>         exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
>     }
>     implementation "com.google.protobuf:protobuf-java:3.14.0"
>     implementation 'com.github.javafaker:javafaker:1.0.2'
> }
>
> ```
>
>
> Any idea what I should try next?
>
> Thanks in advance!
>
>
>
>

-- 
Svend Vanderveken
Kelesia SPRL - BE 0839 049 010
blog: https://svend.kelesia.com <http://svend.kelesia.com/>
Twitter: @sv3ndk <https://twitter.com/sv3ndk>

Reply via email to