This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c2e2b169661 [FLINK-38205][format][pb] Discard unknown fields by
default (#26881)
c2e2b169661 is described below
commit c2e2b16966102d36a8dde02ef5627418a65aab79
Author: Zhanghao Chen <[email protected]>
AuthorDate: Mon Mar 23 19:02:57 2026 +0800
[FLINK-38205][format][pb] Discard unknown fields by default (#26881)
---
.../apache/flink/formats/protobuf/PbConstant.java | 2 +-
.../protobuf/deserialize/ProtoToRowConverter.java | 10 +++-
.../protobuf/ParseProtoWithUnknownFieldsTest.java | 69 ++++++++++++++++++++++
.../src/test/proto/test_map_truncated.proto | 33 +++++++++++
4 files changed, 110 insertions(+), 4 deletions(-)
diff --git
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
index df181d1a449..7aff91f0fb6 100644
---
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
+++
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
@@ -21,7 +21,7 @@ package org.apache.flink.formats.protobuf;
/** Keeps protobuf constants separately. */
public class PbConstant {
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
- public static final String PB_METHOD_PARSE_FROM = "parseFrom";
+ public static final String PB_METHOD_PARSER = "parser";
public static final String GENERATED_DECODE_METHOD = "decode";
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
diff --git
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 730cc897a0d..b61cd09a192 100644
---
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.types.logical.RowType;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.DiscardUnknownFieldsParser;
+import com.google.protobuf.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +54,8 @@ import java.util.UUID;
*/
public class ProtoToRowConverter {
private static final Logger LOG =
LoggerFactory.getLogger(ProtoToRowConverter.class);
- private final Method parseFromMethod;
private final Method decodeMethod;
+ private final Parser<?> protoParser;
private boolean isCodeSplit = false;
public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
@@ -123,14 +125,16 @@ public class ProtoToRowConverter {
codegenAppender.code());
decodeMethod =
generatedClass.getMethod(PbConstant.GENERATED_DECODE_METHOD, messageClass);
- parseFromMethod =
messageClass.getMethod(PbConstant.PB_METHOD_PARSE_FROM, byte[].class);
+ Method parserMethod =
messageClass.getMethod(PbConstant.PB_METHOD_PARSER);
+ Parser originalProtoParser = (Parser) parserMethod.invoke(null);
+ protoParser = DiscardUnknownFieldsParser.wrap(originalProtoParser);
} catch (Exception ex) {
throw new PbCodegenException(ex);
}
}
public RowData convertProtoBinaryToRow(byte[] data) throws Exception {
- Object messageObj = parseFromMethod.invoke(null, data);
+ Object messageObj = protoParser.parseFrom(data);
return (RowData) decodeMethod.invoke(null, messageObj);
}
diff --git
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ParseProtoWithUnknownFieldsTest.java
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ParseProtoWithUnknownFieldsTest.java
new file mode 100644
index 00000000000..ae75f139ed7
--- /dev/null
+++
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ParseProtoWithUnknownFieldsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.MapTest;
+import org.apache.flink.formats.protobuf.testproto.MapTestTruncated;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+
+import com.google.protobuf.ByteString;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test parse proto with unknown fields. */
+class ParseProtoWithUnknownFieldsTest {
+ @Test
+ void testSimple() throws Exception {
+ MapTest.InnerMessageTest innerMessageTest =
+ MapTest.InnerMessageTest.newBuilder().setA(1).setB(2).build();
+ MapTest mapTest =
+ MapTest.newBuilder()
+ .setA(1)
+ .putMap1("a", "b")
+ .putMap1("c", "d")
+ .putMap2("f", innerMessageTest)
+ .putMap3("e", ByteString.copyFrom(new byte[] {1, 2,
3}))
+ .build();
+
+ RowData row =
+ ProtobufTestHelper.pbBytesToRow(MapTestTruncated.class,
mapTest.toByteArray());
+
+ // map3 is unknown in MapTestTruncated
+ assertThat(row.getArity()).isEqualTo(3);
+
+ // inspect field a
+ assertThat(row.getInt(0)).isEqualTo(1);
+
+ // inspect field map1
+ MapData map1 = row.getMap(1);
+ assertThat(map1.keyArray().getString(0)).hasToString("a");
+ assertThat(map1.valueArray().getString(0)).hasToString("b");
+ assertThat(map1.keyArray().getString(1)).hasToString("c");
+ assertThat(map1.valueArray().getString(1)).hasToString("d");
+
+ // inspect field map2
+ MapData map2 = row.getMap(2);
+ assertThat(map2.keyArray().getString(0)).hasToString("f");
+ RowData rowData2 = map2.valueArray().getRow(0, 2);
+ assertThat(rowData2.getInt(0)).isEqualTo(1);
+ assertThat(rowData2.getLong(1)).isEqualTo(2L);
+ }
+}
diff --git
a/flink-formats/flink-protobuf/src/test/proto/test_map_truncated.proto
b/flink-formats/flink-protobuf/src/test/proto/test_map_truncated.proto
new file mode 100644
index 00000000000..0acacea81c5
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/proto/test_map_truncated.proto
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+package org.apache.flink.formats.protobuf.proto;
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_multiple_files = true;
+
+message MapTestTruncated {
+ optional int32 a = 1;
+ map<string, string> map1 = 2;
+ map<string, InnerMessageTest> map2 = 3;
+
+ message InnerMessageTest{
+ optional int32 a = 1;
+ optional int64 b = 2;
+ }
+}