ruanhang1993 edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-790247456


   > @ruanhang1993 Many thanks for finding issues of this PR.
   > I tried to solve two issues you raised.
   > 
   > 1. Regarding the first issue, I relocate protobuf package in pom.xml
   > 2. I'm not sure that open method will be called or not during checkpoint 
recovery process. @libenchao Could you help answer this question?
   
   @maosuhan  I am honored to receive your reply. I use the fixed version for 
problem 1 in flink 1.12.1 recently.
   In flink 1.12.1,  the problem 2 is gone. The `open` method will be called in 
flink 1.12.1 during checkpoint recovery process.
   
   With the fixed code for problem 1 in flink 1.12.1, I got the error like this:
   ```
   Caused by: java.lang.ClassCastException: 
com.google.protobuf.Descriptors$Descriptor cannot be cast to 
org.apache.flink.formats.protobuf.shaded.com.google.protobuf.Descriptors$Descriptor
           at 
org.apache.flink.formats.protobuf.PbFormatUtils.getDescriptor(PbFormatUtils.java:81)
 ~[?:?]
           at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:67)
 ~[?:?]
           at 
org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:49)
 ~[?:?]
           at 
org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:31)
 ~[?:?]
           at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:427)
 ~[?:?]
           ......
   ```
   
   I generate the protobuf message class and package like this, and use it when 
submitting the flink job.
   ```bash
   protoc -I=$PROTO_FILE_PATH --java_out=$JAVA_OUTPUT/src $PROTO_FILE
   
javac -cp $JAVA_OUTPUT/libs/protobuf-java.jar -d $JAVA_OUTPUT/target 
$JAVA_OUTPUT/src/$PACKAGE_PATH/*.java


   cd $JAVA_OUTPUT

   jar -cvf $JAR_NAME -C $JAVA_OUTPUT/target/ .
   ```
   
   The problem is that the `getDescriptor` in my class return the 
`com.google.protobuf.Descriptors$Descriptor` class,  which can not cast to the 
relocated class.
   ```java
   public static Descriptors.Descriptor getDescriptor(String className) {
    
       try {
        
           Class<?> pbClass = Class.forName(className);
        
           return (Descriptors.Descriptor)
 
pbClass.getMethod(PbConstant.PB_METHOD_GET_DESCRIPTOR).invoke(null);
    
       } catch (Exception y) {
        
           throw new IllegalArgumentException(
String.format("get %s 
descriptors error!", className), y);
    
       }
   
}
   ```
   
   Do I need to relocate the class like you when generating the protobuf 
message class? 
   Or is there some other way to fix it?
   
   ps: The setting in 
`META-INF/services/org.apache.flink.table.factories.Factory` needs to be 
changed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to