yufeng.sun created FLINK-35034: ---------------------------------- Summary: codegen compile error raised when use kafka connector and protobuf format Key: FLINK-35034 URL: https://issues.apache.org/jira/browse/FLINK-35034 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.17.2 Reporter: yufeng.sun
The following error messages and stack were encountered When i using Flink SQL with Kafka connector and protobuf format: {code:java} 2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils [] - Protobuf codegen compile error: package org.apache.flink.formats.protobuf.deserialize;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.ArrayData;import org.apache.flink.table.data.binary.BinaryStringData;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.GenericMapData;import org.apache.flink.table.data.GenericArrayData;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.HashMap;import com.google.protobuf.ByteString;public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{public static RowData decode(.UserProtoBuf.User message){RowData rowData=null;.UserProtoBuf.User message0 = message;GenericRowData rowData0 = new GenericRowData(7);Object elementDataVar1 = null;elementDataVar1 = message0.getAge(); rowData0.setField(0, elementDataVar1);Object elementDataVar2 = null;elementDataVar2 = message0.getTimestamp(); rowData0.setField(1, elementDataVar2);Object elementDataVar3 = null;elementDataVar3 = message0.getEnabled(); rowData0.setField(2, elementDataVar3);Object elementDataVar4 = null;elementDataVar4 = message0.getHeight(); rowData0.setField(3, elementDataVar4);Object elementDataVar5 = null;elementDataVar5 = message0.getWeight(); rowData0.setField(4, elementDataVar5);Object elementDataVar6 = null;elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString()); rowData0.setField(5, elementDataVar6);Object elementDataVar7 = null;elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString()); rowData0.setField(6, elementDataVar7);rowData = rowData0; return rowData;}} 2024-03-23 23:23:38,856 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause:org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 moreCaused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.' at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 more {code} proto file: {code:java} syntax = "proto3";option java_outer_classname = "UserProtoBuf";message User { int32 age = 1; int64 timestamp = 2; bool enabled = 3; float height = 4; double weight = 5; string userName = 6; string Full_Address = 7; } {code} Flink SQL: {code:java} CREATE TEMPORARY TABLE test ( ... ) WITH ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'earliest-offset', 'format' = 'protobuf', 'protobuf.message-class-name' = 'org.example.UserProtoBuf$User', 'protobuf.ignore-parse-errors' = 'true' ) ; {code} according to the error message, the type of the parameter `message` which is used in method `decode` was lost package info. {code:java} public static RowData decode(.UserProtoBuf.User message){} {code} After analyzing the following method calls, i found that the above exception will occur when neither `package` nor `option java_package` is specified in the proto file, at this time, the variable `javaPackageName` in method `getOuterProtoPrefix` will be an empty string. !https://intranetproxy.alipay.com/skylark/lark/0/2024/png/59256556/1712473173927-c277b275-08cc-4bb3-8322-f0c8937700b3.png! {code:java} org.apache.flink.formats.protobuf.util.PbCodegenUtils#compileClass org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter#ProtoToRowConverter - Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code()); - codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){"); - String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor); org.apache.flink.formats.protobuf.util.PbFormatUtils#getFullJavaName(com.google.protobuf.Descriptors.Descriptor) org.apache.flink.formats.protobuf.util.PbFormatUtils#getOuterProtoPrefix {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)