This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bf75870c474 [FLINK-33817][protobuf] Allow ReadDefaultValues = False 
for non primitive types on Proto3
bf75870c474 is described below

commit bf75870c4744c884860ee72bf464a301d18fb477
Author: dsaisharath <dsaishar...@uber.com>
AuthorDate: Fri Jan 5 16:03:52 2024 -0800

    [FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primitive 
types on Proto3
    
    Close apache/flink#24035
---
 .../docs/connectors/table/formats/protobuf.md       |  7 +++++--
 .../flink/formats/protobuf/PbFormatContext.java     |  9 ++++++++-
 .../flink/formats/protobuf/PbFormatOptions.java     |  5 ++++-
 .../deserialize/PbCodegenRowDeserializer.java       | 14 ++++++++++----
 .../protobuf/deserialize/ProtoToRowConverter.java   | 15 ++++++---------
 .../protobuf/serialize/RowToProtoConverter.java     |  4 ++--
 .../apache/flink/formats/protobuf/Pb3ToRowTest.java | 21 +++++++--------------
 7 files changed, 42 insertions(+), 33 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/protobuf.md 
b/docs/content/docs/connectors/table/formats/protobuf.md
index b28cf49f9d3..72d4aae3c3d 100644
--- a/docs/content/docs/connectors/table/formats/protobuf.md
+++ b/docs/content/docs/connectors/table/formats/protobuf.md
@@ -149,9 +149,12 @@ Format Options
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>
-          This option only works if the generated class's version is proto2. 
If this value is set to true, the format will read empty values as the default 
values defined in the proto file.
+          If this value is set to true, the format will read empty values as 
the default values defined in the proto file.
           If the value is set to false, the format will generate null values 
if the data element does not exist in the binary protobuf message.
-          If the proto syntax is proto3, this value will forcibly be set to 
true, because proto3's standard is to use default values.
+          If proto syntax is proto3, users need to set this to true when using 
protobuf versions lower than 3.15 as older versions do not support 
+          checking for field presence which can cause runtime compilation 
issues. Additionally, primtive types will be set to default values 
+          instead of null as field presence cannot be checked for them. Please 
be aware that setting this to true will cause the deserialization 
+          performance to be much slower depending on schema complexity and 
message size.
       </td>
     </tr>
     <tr>
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
index 9302cc274c3..fc2090e24de 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
@@ -28,9 +28,12 @@ import java.util.List;
 public class PbFormatContext {
     private final PbFormatConfig pbFormatConfig;
     private final List<String> splitMethodStack = new ArrayList<>();
+    private final boolean readDefaultValuesForPrimitiveTypes;
 
-    public PbFormatContext(PbFormatConfig pbFormatConfig) {
+    public PbFormatContext(
+            PbFormatConfig pbFormatConfig, boolean 
readDefaultValuesForPrimitiveTypes) {
         this.pbFormatConfig = pbFormatConfig;
+        this.readDefaultValuesForPrimitiveTypes = 
readDefaultValuesForPrimitiveTypes;
     }
 
     private String createSplitMethod(
@@ -73,4 +76,8 @@ public class PbFormatContext {
     public PbFormatConfig getPbFormatConfig() {
         return pbFormatConfig;
     }
+
+    public boolean getReadDefaultValuesForPrimitiveTypes() {
+        return readDefaultValuesForPrimitiveTypes;
+    }
 }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
index 1cf884f8314..18b8fc9618f 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
@@ -44,7 +44,10 @@ public class PbFormatOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Optional flag to read as default values instead 
of null when some field does not exist in deserialization; default to false."
-                                    + "If proto syntax is proto3, this value 
will be set true forcibly because proto3's standard is to use default values.");
+                                    + "If proto syntax is proto3, users need 
to set this to true when using protobuf versions lower than 3.15 as older 
versions "
+                                    + "do not support checking for field 
presence which can cause runtime compilation issues. Additionally, primtive 
types "
+                                    + "will be set to default values instead 
of null as field presence cannot be checked for them. Please be aware that 
setting this"
+                                    + " to true will cause the deserialization 
performance to be much slower depending on schema complexity and message size");
     public static final ConfigOption<String> WRITE_NULL_STRING_LITERAL =
             ConfigOptions.key("write-null-string-literal")
                     .stringType()
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index 541b1f83839..aa1e525ddd0 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -71,9 +71,15 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
             PbCodegenDeserializer codegen =
                     PbCodegenDeserializeFactory.getPbCodegenDes(elementFd, 
subType, formatContext);
             splitAppender.appendLine("Object " + flinkRowEleVar + " = null");
-            if (!formatContext.getPbFormatConfig().isReadDefaultValues()) {
-                // only works in syntax=proto2 and readDefaultValues=false
-                // readDefaultValues must be true in pb3 mode
+            boolean readDefaultValues = 
formatContext.getPbFormatConfig().isReadDefaultValues();
+            if (PbFormatUtils.isSimpleType(subType)) {
+                readDefaultValues = 
formatContext.getReadDefaultValuesForPrimitiveTypes();
+            }
+
+            if (!readDefaultValues) {
+                // works for both syntax=proto2/proto3 and 
readDefaultValues=false for non-primitive
+                // types
+                // readDefaultValues must be true in pb3 mode for primitive 
types
                 String isMessageElementNonEmptyCode =
                         isMessageElementNonEmptyCode(
                                 pbMessageVar,
@@ -91,7 +97,7 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
                     codegen.codegen(
                             flinkRowEleVar, pbGetMessageElementCode, 
splitAppender.currentIndent());
             splitAppender.appendSegment(code);
-            if (!formatContext.getPbFormatConfig().isReadDefaultValues()) {
+            if (!readDefaultValues) {
                 splitAppender.end("}");
             }
             splitAppender.appendLine(
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 2c132e892fb..29665f2a306 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -68,18 +68,15 @@ public class ProtoToRowConverter {
                             true,
                             Thread.currentThread().getContextClassLoader());
             String fullMessageClassName = 
PbFormatUtils.getFullJavaName(descriptor);
+            boolean readDefaultValuesForPrimitiveTypes = 
formatConfig.isReadDefaultValues();
             if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
-                // pb3 always read default values
-                formatConfig =
-                        new PbFormatConfig(
-                                formatConfig.getMessageClassName(),
-                                formatConfig.isIgnoreParseErrors(),
-                                true,
-                                formatConfig.getWriteNullStringLiterals());
+                // pb3 always read default values for primitive types
+                readDefaultValuesForPrimitiveTypes = true;
             }
             PbCodegenAppender codegenAppender = new PbCodegenAppender();
-            PbFormatContext pbFormatContext = new 
PbFormatContext(formatConfig);
-            String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
+            PbFormatContext pbFormatContext =
+                    new PbFormatContext(formatConfig, 
readDefaultValuesForPrimitiveTypes);
+            String uuid = UUID.randomUUID().toString().replace("-", "");
             String generatedClassName = "GeneratedProtoToRow_" + uuid;
             String generatedPackageName = 
ProtoToRowConverter.class.getPackage().getName();
             codegenAppender.appendLine("package " + generatedPackageName);
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
index d7a3ef48e41..b0e23d08419 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
@@ -59,10 +59,10 @@ public class RowToProtoConverter {
         try {
             Descriptors.Descriptor descriptor =
                     
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
-            PbFormatContext formatContext = new PbFormatContext(formatConfig);
+            PbFormatContext formatContext = new PbFormatContext(formatConfig, 
false);
 
             PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
-            String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
+            String uuid = UUID.randomUUID().toString().replace("-", "");
             String generatedClassName = "GeneratedRowToProto_" + uuid;
             String generatedPackageName = 
RowToProtoConverter.class.getPackage().getName();
             codegenAppender.appendLine("package " + generatedPackageName);
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
index 960b215ad4c..7dea8ccdda4 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test conversion of proto3 data to flink internal data. Default values after 
conversion is tested
@@ -92,17 +93,14 @@ public class Pb3ToRowTest {
         Pb3Test pb3Test = Pb3Test.newBuilder().build();
         RowData row = ProtobufTestHelper.pbBytesToRow(Pb3Test.class, 
pb3Test.toByteArray());
 
+        // primitive types should have default values
         assertFalse(row.isNullAt(0));
         assertFalse(row.isNullAt(1));
         assertFalse(row.isNullAt(2));
         assertFalse(row.isNullAt(3));
         assertFalse(row.isNullAt(4));
         assertFalse(row.isNullAt(5));
-        assertFalse(row.isNullAt(6));
-        assertFalse(row.isNullAt(7));
         assertFalse(row.isNullAt(8));
-        assertFalse(row.isNullAt(9));
-        assertFalse(row.isNullAt(10));
 
         assertEquals(0, row.getInt(0));
         assertEquals(0L, row.getLong(1));
@@ -110,16 +108,11 @@ public class Pb3ToRowTest {
         assertEquals(Float.valueOf(0.0f), Float.valueOf(row.getFloat(3)));
         assertEquals(Double.valueOf(0.0d), Double.valueOf(row.getDouble(4)));
         assertEquals("UNIVERSAL", row.getString(5).toString());
-
-        RowData rowData = row.getRow(6, 2);
-        assertEquals(0, rowData.getInt(0));
-        assertEquals(0L, rowData.getLong(1));
-
-        assertEquals(0, row.getArray(7).size());
-
         assertEquals(0, row.getBinary(8).length);
-
-        assertEquals(0, row.getMap(9).size());
-        assertEquals(0, row.getMap(10).size());
+        // non-primitive types should be null
+        assertTrue(row.isNullAt(6));
+        assertTrue(row.isNullAt(7));
+        assertTrue(row.isNullAt(9));
+        assertTrue(row.isNullAt(10));
     }
 }

Reply via email to