This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c2e2b169661 [FLINK-38205][format][pb] Discard unknown fields by 
default (#26881)
c2e2b169661 is described below

commit c2e2b16966102d36a8dde02ef5627418a65aab79
Author: Zhanghao Chen <[email protected]>
AuthorDate: Mon Mar 23 19:02:57 2026 +0800

    [FLINK-38205][format][pb] Discard unknown fields by default (#26881)
---
 .../apache/flink/formats/protobuf/PbConstant.java  |  2 +-
 .../protobuf/deserialize/ProtoToRowConverter.java  | 10 +++-
 .../protobuf/ParseProtoWithUnknownFieldsTest.java  | 69 ++++++++++++++++++++++
 .../src/test/proto/test_map_truncated.proto        | 33 +++++++++++
 4 files changed, 110 insertions(+), 4 deletions(-)

diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
index df181d1a449..7aff91f0fb6 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
@@ -21,7 +21,7 @@ package org.apache.flink.formats.protobuf;
 /** Keeps protobuf constants separately. */
 public class PbConstant {
     public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
-    public static final String PB_METHOD_PARSE_FROM = "parseFrom";
+    public static final String PB_METHOD_PARSER = "parser";
     public static final String GENERATED_DECODE_METHOD = "decode";
     public static final String GENERATED_ENCODE_METHOD = "encode";
     public static final String PB_MAP_KEY_NAME = "key";
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 730cc897a0d..b61cd09a192 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.types.logical.RowType;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
+import com.google.protobuf.DiscardUnknownFieldsParser;
+import com.google.protobuf.Parser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +54,8 @@ import java.util.UUID;
  */
 public class ProtoToRowConverter {
     private static final Logger LOG = 
LoggerFactory.getLogger(ProtoToRowConverter.class);
-    private final Method parseFromMethod;
     private final Method decodeMethod;
+    private final Parser<?> protoParser;
     private boolean isCodeSplit = false;
 
     public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
@@ -123,14 +125,16 @@ public class ProtoToRowConverter {
                             codegenAppender.code());
             decodeMethod =
                     
generatedClass.getMethod(PbConstant.GENERATED_DECODE_METHOD, messageClass);
-            parseFromMethod = 
messageClass.getMethod(PbConstant.PB_METHOD_PARSE_FROM, byte[].class);
+            Method parserMethod = 
messageClass.getMethod(PbConstant.PB_METHOD_PARSER);
+            Parser originalProtoParser = (Parser) parserMethod.invoke(null);
+            protoParser = DiscardUnknownFieldsParser.wrap(originalProtoParser);
         } catch (Exception ex) {
             throw new PbCodegenException(ex);
         }
     }
 
     public RowData convertProtoBinaryToRow(byte[] data) throws Exception {
-        Object messageObj = parseFromMethod.invoke(null, data);
+        Object messageObj = protoParser.parseFrom(data);
         return (RowData) decodeMethod.invoke(null, messageObj);
     }
 
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ParseProtoWithUnknownFieldsTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ParseProtoWithUnknownFieldsTest.java
new file mode 100644
index 00000000000..ae75f139ed7
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ParseProtoWithUnknownFieldsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.MapTest;
+import org.apache.flink.formats.protobuf.testproto.MapTestTruncated;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.ByteString;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test parse proto with unknown fields. */
+class ParseProtoWithUnknownFieldsTest {
+    @Test
+    void testSimple() throws Exception {
+        MapTest.InnerMessageTest innerMessageTest =
+                MapTest.InnerMessageTest.newBuilder().setA(1).setB(2).build();
+        MapTest mapTest =
+                MapTest.newBuilder()
+                        .setA(1)
+                        .putMap1("a", "b")
+                        .putMap1("c", "d")
+                        .putMap2("f", innerMessageTest)
+                        .putMap3("e", ByteString.copyFrom(new byte[] {1, 2, 
3}))
+                        .build();
+
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(MapTestTruncated.class, 
mapTest.toByteArray());
+
+        // map3 is unknown in MapTestTruncated
+        assertThat(row.getArity()).isEqualTo(3);
+
+        // inspect field a
+        assertThat(row.getInt(0)).isEqualTo(1);
+
+        // inspect field map1
+        MapData map1 = row.getMap(1);
+        assertThat(map1.keyArray().getString(0)).hasToString("a");
+        assertThat(map1.valueArray().getString(0)).hasToString("b");
+        assertThat(map1.keyArray().getString(1)).hasToString("c");
+        assertThat(map1.valueArray().getString(1)).hasToString("d");
+
+        // inspect field map2
+        MapData map2 = row.getMap(2);
+        assertThat(map2.keyArray().getString(0)).hasToString("f");
+        RowData rowData2 = map2.valueArray().getRow(0, 2);
+        assertThat(rowData2.getInt(0)).isEqualTo(1);
+        assertThat(rowData2.getLong(1)).isEqualTo(2L);
+    }
+}
diff --git 
a/flink-formats/flink-protobuf/src/test/proto/test_map_truncated.proto 
b/flink-formats/flink-protobuf/src/test/proto/test_map_truncated.proto
new file mode 100644
index 00000000000..0acacea81c5
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/proto/test_map_truncated.proto
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+package org.apache.flink.formats.protobuf.proto;
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_multiple_files = true;
+
+message MapTestTruncated {
+  optional int32 a = 1;
+  map<string, string> map1 = 2;
+  map<string, InnerMessageTest> map2 = 3;
+
+  message InnerMessageTest{
+    optional int32 a = 1;
+    optional int64 b = 2;
+  }
+}

Reply via email to