Re: should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

2021-06-14 Thread Yun Gao
Hi Jin,

The warning would be given as long as trying to parse the type
as PoJo failed, and turn to the Kryo serializer. The registered 
ProtobufSerializer would acts as a plugin inside the kryo serializer.
Thus the warning should be able to be ignored. When serializing
it would first turn to the kryo serializer, and kryo serializer would check
the registered "plugin serializer" and turn to use ProtobufSerialize. 
You may also verify that via taking stack snapshot at the TM side for
double confirmation.

Best,
Yun



 --Original Mail --
Sender:Jin Yi 
Send Date:Sun Jun 13 03:54:43 2021
Recipients:User-Flink 
Subject:should i expect POJO serialization warnings when dealing w/ kryo 
protobuf serialization?

i'm currently using protobufs, and registering the serializers using kryo 
protobuf using the following snippet of code:

 static void optionalRegisterProtobufSerializer(ExecutionConfig config, 
Class clazz) {
 if (clazz != null) {
 config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.class);
 }
 }

 static void configureExecutionConfig(ExecutionConfig config) {
 optionalRegisterProtobufSerializer(config, User.class);
 optionalRegisterProtobufSerializer(config, View.class);
 optionalRegisterProtobufSerializer(config, Request.class);
 optionalRegisterProtobufSerializer(config, Insertion.class);
 optionalRegisterProtobufSerializer(config, Impression.class);
 optionalRegisterProtobufSerializer(config, Action.class);
 optionalRegisterProtobufSerializer(config, FlatEvent.class);
 optionalRegisterProtobufSerializer(config, LatestImpression.class);
 }

 // TODO - reuse with batch.
 void configureStreamExecutionEnvironment(StreamExecutionEnvironment env) {
 configureExecutionConfig(env.getConfig());
 if (checkpointInterval > 0) {
 env.enableCheckpointing(checkpointInterval);
 }
 env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);
 // TODO - evaluate if we want setMinPauseBetweenCheckpoints.
 if (minPauseBetweenCheckpoints > 0) {
 
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);
 }
 if (unalignedCheckpoints) {
 env.getCheckpointConfig().enableUnalignedCheckpoints();
 }
 if (checkpointTimeout > 0) {
 env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);
 }
 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 }

the concerning thing i have a question on is that i'm seeing these sorts of 
info logs in the taskmanager logs:

org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class 
ai.promoted.proto.event.FlatEvent cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.
2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
[] - class ai.promoted.proto.event.LatestImpression does not contain a getter 
for field impressionId_
2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
[] - class ai.promoted.proto.event.LatestImpression does not contain a setter 
for field impressionId_
2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
[] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a 
POJO type because not all fields are valid POJO fields, and must be processed 
as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance.
2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
[] - class ai.promoted.proto.event.LatestImpression does not contain a getter 
for field impressionId_
2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
[] - class ai.promoted.proto.event.LatestImpression does not contain a setter 
for field impressionId_
2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
[] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a 
POJO type because not all fields are valid POJO fields, and must be processed 
as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance.

can i safely ignore these?  is it telling me that it's doing the right thing 
since kryo should kick in for GenericType?

should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

2021-06-12 Thread Jin Yi
i'm currently using protobufs, and registering the serializers using kryo
protobuf using the following snippet of code:

static void optionalRegisterProtobufSerializer(ExecutionConfig config,
Class clazz) {

if (clazz != null) {

config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.
class);

}

}


static void configureExecutionConfig(ExecutionConfig config) {

optionalRegisterProtobufSerializer(config, User.class);

optionalRegisterProtobufSerializer(config, View.class);

optionalRegisterProtobufSerializer(config, Request.class);

optionalRegisterProtobufSerializer(config, Insertion.class);

optionalRegisterProtobufSerializer(config, Impression.class);

optionalRegisterProtobufSerializer(config, Action.class);

optionalRegisterProtobufSerializer(config, FlatEvent.class);

optionalRegisterProtobufSerializer(config, LatestImpression.class);

}


// *TODO* - reuse with batch.

void configureStreamExecutionEnvironment(StreamExecutionEnvironment
env) {

configureExecutionConfig(env.getConfig());

if (checkpointInterval > 0) {

env.enableCheckpointing(checkpointInterval);

}

env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);

// *TODO* - evaluate if we want setMinPauseBetweenCheckpoints.

if (minPauseBetweenCheckpoints > 0) {


env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);

}

if (unalignedCheckpoints) {

env.getCheckpointConfig().enableUnalignedCheckpoints();

}

if (checkpointTimeout > 0) {


env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);

}


env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

the concerning thing i have a question on is that i'm seeing these sorts of
info logs in the taskmanager logs:

org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a getter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a setter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class ai.promoted.proto.event.LatestImpression cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a getter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a setter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class ai.promoted.proto.event.LatestImpression cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

can i safely ignore these?  is it telling me that it's doing the right
thing since kryo should kick in for GenericType?