This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ec4c3b8bd [FLINK-32650][protobuf] Split generated code in protobuf 
format to mitigate huge method problems
a2ec4c3b8bd is described below

commit a2ec4c3b8bd9f2009fd721ab4c51afedcd8574fb
Author: lijingwei.5018 <lijingwei.5...@bytedance.com>
AuthorDate: Mon Nov 13 11:32:43 2023 +0800

    [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate 
huge method problems
    
    Close apache/flink#23162
---
 .../apache/flink/formats/protobuf/PbConstant.java  |   6 +
 .../flink/formats/protobuf/PbFormatContext.java    |  44 ++++++
 .../deserialize/PbCodegenRowDeserializer.java      |  30 +++-
 .../PbRowDataDeserializationSchema.java            |   6 +
 .../protobuf/deserialize/ProtoToRowConverter.java  |  13 ++
 .../protobuf/serialize/PbCodegenRowSerializer.java |  37 ++++-
 .../serialize/PbRowDataSerializationSchema.java    |   6 +
 .../protobuf/serialize/RowToProtoConverter.java    |  13 ++
 .../formats/protobuf/util/PbCodegenUtils.java      |   4 +
 .../formats/protobuf/BigPbProtoToRowTest.java      | 140 +++++++++++++++++
 .../formats/protobuf/BigPbRowToProtoTest.java      | 166 +++++++++++++++++++++
 .../src/test/proto/test_big_pb.proto               |  61 ++++++++
 12 files changed, 512 insertions(+), 14 deletions(-)

diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
index ea7d6514c56..4a39794e68d 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
@@ -27,4 +27,10 @@ public class PbConstant {
     public static final String PB_MAP_KEY_NAME = "key";
     public static final String PB_MAP_VALUE_NAME = "value";
     public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
+    /**
+     * JIT optimizer threshold is 8K, unicode encode one char use 1byte, so 
use 4K as
+     * codegen_spilt_threshold,A conservative threshold is selected to prevent 
multiple element code
+     * segments in RowType from being combined to exceed 8K.
+     */
+    public static final int CODEGEN_SPLIT_THRESHOLD = 4000;
 }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
index 27ceb0fb49d..9302cc274c3 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
@@ -18,14 +18,58 @@
 
 package org.apache.flink.formats.protobuf;
 
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenVarId;
+
+import java.util.ArrayList;
+import java.util.List;
+
 /** store config and common information. */
 public class PbFormatContext {
     private final PbFormatConfig pbFormatConfig;
+    private final List<String> splitMethodStack = new ArrayList<>();
 
     public PbFormatContext(PbFormatConfig pbFormatConfig) {
         this.pbFormatConfig = pbFormatConfig;
     }
 
+    private String createSplitMethod(
+            String rowDataType,
+            String rowDataVar,
+            String messageTypeStr,
+            String messageTypeVar,
+            String code) {
+        int uid = PbCodegenVarId.getInstance().getAndIncrement();
+        String splitMethodName = "split" + uid;
+        PbCodegenAppender pbCodegenAppender = new PbCodegenAppender();
+        pbCodegenAppender.appendSegment(
+                String.format(
+                        "private static void %s (%s %s, %s %s) {\n %s \n}",
+                        splitMethodName,
+                        rowDataType,
+                        rowDataVar,
+                        messageTypeStr,
+                        messageTypeVar,
+                        code));
+        splitMethodStack.add(pbCodegenAppender.code());
+        return String.format("%s(%s, %s);", splitMethodName, rowDataVar, 
messageTypeVar);
+    }
+
+    public String splitDeserializerRowTypeMethod(
+            String rowDataVar, String messageTypeStr, String messageTypeVar, 
String code) {
+        return createSplitMethod(
+                "GenericRowData", rowDataVar, messageTypeStr, messageTypeVar, 
code);
+    }
+
+    public String splitSerializerRowTypeMethod(
+            String rowDataVar, String messageTypeStr, String messageTypeVar, 
String code) {
+        return createSplitMethod("RowData", rowDataVar, messageTypeStr, 
messageTypeVar, code);
+    }
+
+    public List<String> getSplitMethodStack() {
+        return splitMethodStack;
+    }
+
     public PbFormatConfig getPbFormatConfig() {
         return pbFormatConfig;
     }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index d1fc0c60726..f3cf414f540 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.protobuf.deserialize;
 import org.apache.flink.formats.protobuf.PbCodegenException;
 import org.apache.flink.formats.protobuf.PbFormatContext;
 import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenUtils;
 import org.apache.flink.formats.protobuf.util.PbCodegenVarId;
 import org.apache.flink.formats.protobuf.util.PbFormatUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -59,6 +60,7 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
         appender.appendLine(
                 "GenericRowData " + flinkRowDataVar + " = new GenericRowData(" 
+ fieldSize + ")");
         int index = 0;
+        PbCodegenAppender splitAppender = new PbCodegenAppender(indent);
         for (String fieldName : rowType.getFieldNames()) {
             int subUid = varUid.getAndIncrement();
             String flinkRowEleVar = "elementDataVar" + subUid;
@@ -68,7 +70,7 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
             String strongCamelFieldName = 
PbFormatUtils.getStrongCamelCaseJsonName(fieldName);
             PbCodegenDeserializer codegen =
                     PbCodegenDeserializeFactory.getPbCodegenDes(elementFd, 
subType, formatContext);
-            appender.appendLine("Object " + flinkRowEleVar + " = null");
+            splitAppender.appendLine("Object " + flinkRowEleVar + " = null");
             if (!formatContext.getPbFormatConfig().isReadDefaultValues()) {
                 // only works in syntax=proto2 and readDefaultValues=false
                 // readDefaultValues must be true in pb3 mode
@@ -77,7 +79,7 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
                                 pbMessageVar,
                                 strongCamelFieldName,
                                 PbFormatUtils.isRepeatedType(subType));
-                appender.begin("if(" + isMessageElementNonEmptyCode + "){");
+                splitAppender.begin("if(" + isMessageElementNonEmptyCode + 
"){");
             }
             String pbGetMessageElementCode =
                     pbGetMessageElementCode(
@@ -87,15 +89,31 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
                             PbFormatUtils.isArrayType(subType));
             String code =
                     codegen.codegen(
-                            flinkRowEleVar, pbGetMessageElementCode, 
appender.currentIndent());
-            appender.appendSegment(code);
+                            flinkRowEleVar, pbGetMessageElementCode, 
splitAppender.currentIndent());
+            splitAppender.appendSegment(code);
             if (!formatContext.getPbFormatConfig().isReadDefaultValues()) {
-                appender.end("}");
+                splitAppender.end("}");
             }
-            appender.appendLine(
+            splitAppender.appendLine(
                     flinkRowDataVar + ".setField(" + index + ", " + 
flinkRowEleVar + ")");
+            if (PbCodegenUtils.needToSplit(splitAppender.code().length())) {
+                String splitMethod =
+                        formatContext.splitDeserializerRowTypeMethod(
+                                flinkRowDataVar,
+                                pbMessageTypeStr,
+                                pbMessageVar,
+                                splitAppender.code());
+                appender.appendSegment(splitMethod);
+                splitAppender = new PbCodegenAppender();
+            }
             index += 1;
         }
+        if (!splitAppender.code().isEmpty()) {
+            String splitMethod =
+                    formatContext.splitDeserializerRowTypeMethod(
+                            flinkRowDataVar, pbMessageTypeStr, pbMessageVar, 
splitAppender.code());
+            appender.appendSegment(splitMethod);
+        }
         appender.appendLine(resultVar + " = " + flinkRowDataVar);
         return appender.code();
     }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
index 5d3872713d2..4225c6390c6 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.protobuf.deserialize;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.protobuf.PbFormatConfig;
@@ -76,6 +77,11 @@ public class PbRowDataDeserializationSchema implements 
DeserializationSchema<Row
         }
     }
 
+    @VisibleForTesting
+    public boolean isCodeSplit() {
+        return protoToRowConverter.isCodeSplit();
+    }
+
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 4564efce2a4..2c132e892fb 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.protobuf.deserialize;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.formats.protobuf.PbCodegenException;
 import org.apache.flink.formats.protobuf.PbConstant;
 import org.apache.flink.formats.protobuf.PbFormatConfig;
@@ -54,6 +55,7 @@ public class ProtoToRowConverter {
     private static final Logger LOG = 
LoggerFactory.getLogger(ProtoToRowConverter.class);
     private final Method parseFromMethod;
     private final Method decodeMethod;
+    private boolean isCodeSplit = false;
 
     public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
             throws PbCodegenException {
@@ -108,6 +110,12 @@ public class ProtoToRowConverter {
             codegenAppender.appendSegment(genCode);
             codegenAppender.appendLine("return rowData");
             codegenAppender.appendSegment("}");
+            if (!pbFormatContext.getSplitMethodStack().isEmpty()) {
+                isCodeSplit = true;
+                for (String splitMethod : 
pbFormatContext.getSplitMethodStack()) {
+                    codegenAppender.appendSegment(splitMethod);
+                }
+            }
             codegenAppender.appendSegment("}");
 
             String printCode = codegenAppender.printWithLineNumber();
@@ -129,4 +137,9 @@ public class ProtoToRowConverter {
         Object messageObj = parseFromMethod.invoke(null, data);
         return (RowData) decodeMethod.invoke(null, messageObj);
     }
+
+    @VisibleForTesting
+    protected boolean isCodeSplit() {
+        return isCodeSplit;
+    }
 }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
index 083a628b70d..342117cd272 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
@@ -63,6 +63,7 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
                         + pbMessageTypeStr
                         + ".newBuilder()");
         int index = 0;
+        PbCodegenAppender splitAppender = new PbCodegenAppender(indent);
         for (String fieldName : rowType.getFieldNames()) {
             Descriptors.FieldDescriptor elementFd = 
descriptor.findFieldByName(fieldName);
             LogicalType subType = 
rowType.getTypeAt(rowType.getFieldIndex(fieldName));
@@ -80,17 +81,18 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
 
             // Only set non-null element of flink row to proto object. The 
real value in proto
             // result depends on protobuf implementation.
-            appender.begin("if(!" + flinkRowDataVar + ".isNullAt(" + index + 
")){");
-            appender.appendLine(elementPbTypeStr + " " + elementPbVar);
+            splitAppender.begin("if(!" + flinkRowDataVar + ".isNullAt(" + 
index + ")){");
+            splitAppender.appendLine(elementPbTypeStr + " " + elementPbVar);
             String flinkRowElementCode =
                     PbCodegenUtils.flinkContainerElementCode(flinkRowDataVar, 
index + "", subType);
             PbCodegenSerializer codegen =
                     PbCodegenSerializeFactory.getPbCodegenSer(elementFd, 
subType, formatContext);
             String code =
-                    codegen.codegen(elementPbVar, flinkRowElementCode, 
appender.currentIndent());
-            appender.appendSegment(code);
+                    codegen.codegen(
+                            elementPbVar, flinkRowElementCode, 
splitAppender.currentIndent());
+            splitAppender.appendSegment(code);
             if (subType.getTypeRoot() == LogicalTypeRoot.ARRAY) {
-                appender.appendLine(
+                splitAppender.appendLine(
                         messageBuilderVar
                                 + ".addAll"
                                 + strongCamelFieldName
@@ -98,7 +100,7 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
                                 + elementPbVar
                                 + ")");
             } else if (subType.getTypeRoot() == LogicalTypeRoot.MAP) {
-                appender.appendLine(
+                splitAppender.appendLine(
                         messageBuilderVar
                                 + ".putAll"
                                 + strongCamelFieldName
@@ -106,7 +108,7 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
                                 + elementPbVar
                                 + ")");
             } else {
-                appender.appendLine(
+                splitAppender.appendLine(
                         messageBuilderVar
                                 + ".set"
                                 + strongCamelFieldName
@@ -114,9 +116,28 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
                                 + elementPbVar
                                 + ")");
             }
-            appender.end("}");
+            splitAppender.end("}");
+            if (PbCodegenUtils.needToSplit(splitAppender.code().length())) {
+                String splitMethod =
+                        formatContext.splitSerializerRowTypeMethod(
+                                flinkRowDataVar,
+                                pbMessageTypeStr + ".Builder",
+                                messageBuilderVar,
+                                splitAppender.code());
+                appender.appendSegment(splitMethod);
+                splitAppender = new PbCodegenAppender();
+            }
             index += 1;
         }
+        if (!splitAppender.code().isEmpty()) {
+            String splitMethod =
+                    formatContext.splitSerializerRowTypeMethod(
+                            flinkRowDataVar,
+                            pbMessageTypeStr + ".Builder",
+                            messageBuilderVar,
+                            splitAppender.code());
+            appender.appendSegment(splitMethod);
+        }
         appender.appendLine(resultVar + " = " + messageBuilderVar + 
".build()");
         return appender.code();
     }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java
index 24484dd71b4..b268e148f8e 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.protobuf.serialize;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.formats.protobuf.PbFormatConfig;
 import org.apache.flink.formats.protobuf.util.PbFormatUtils;
@@ -55,6 +56,11 @@ public class PbRowDataSerializationSchema implements 
SerializationSchema<RowData
         rowToProtoConverter = new RowToProtoConverter(rowType, pbFormatConfig);
     }
 
+    @VisibleForTesting
+    public boolean isCodeSplit() {
+        return rowToProtoConverter.isCodeSplit();
+    }
+
     @Override
     public byte[] serialize(RowData element) {
         try {
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
index df701a515a2..d7a3ef48e41 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.protobuf.serialize;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.formats.protobuf.PbCodegenException;
 import org.apache.flink.formats.protobuf.PbConstant;
 import org.apache.flink.formats.protobuf.PbFormatConfig;
@@ -51,6 +52,7 @@ import java.util.UUID;
 public class RowToProtoConverter {
     private static final Logger LOG = 
LoggerFactory.getLogger(ProtoToRowConverter.class);
     private final Method encodeMethod;
+    private boolean isCodeSplit = false;
 
     public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
             throws PbCodegenException {
@@ -89,6 +91,12 @@ public class RowToProtoConverter {
             codegenAppender.appendSegment(genCode);
             codegenAppender.appendLine("return message");
             codegenAppender.end("}");
+            if (!formatContext.getSplitMethodStack().isEmpty()) {
+                isCodeSplit = true;
+                for (String spliteMethod : 
formatContext.getSplitMethodStack()) {
+                    codegenAppender.appendSegment(spliteMethod);
+                }
+            }
             codegenAppender.end("}");
 
             String printCode = codegenAppender.printWithLineNumber();
@@ -109,4 +117,9 @@ public class RowToProtoConverter {
         AbstractMessage message = (AbstractMessage) encodeMethod.invoke(null, 
rowData);
         return message.toByteArray();
     }
+
+    @VisibleForTesting
+    protected boolean isCodeSplit() {
+        return isCodeSplit;
+    }
 }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
index 042392e302b..87643aac72c 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
@@ -264,4 +264,8 @@ public class PbCodegenUtils {
         }
         return simpleCompiler.getClassLoader().loadClass(className);
     }
+
+    public static boolean needToSplit(int noSplitCodeSize) {
+        return noSplitCodeSize >= PbConstant.CODEGEN_SPLIT_THRESHOLD;
+    }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
new file mode 100644
index 00000000000..ad0ee2efae3
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Test for huge proto definition, which may trigger some special 
optimizations such as code
+ * splitting.
+ */
+public class BigPbProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        BigPbClass.BigPbMessage bigPbMessage =
+                BigPbClass.BigPbMessage.newBuilder()
+                        .setIntField1(5)
+                        .setBoolField2(false)
+                        .setStringField3("test1")
+                        .setBytesField4(ByteString.copyFrom(new byte[] {1, 2, 
3}))
+                        .setDoubleField5(2.5)
+                        .setFloatField6(1.5F)
+                        .setUint32Field7(3)
+                        .setInt64Field8(7L)
+                        .setUint64Field9(9L)
+                        .setBytesField10(ByteString.copyFrom(new byte[] {4, 5, 
6}))
+                        .setDoubleField11(6.5)
+                        .setBytesField12(ByteString.copyFrom(new byte[] {7, 8, 
9}))
+                        .setBoolField13(true)
+                        .setStringField14("test2")
+                        .setFloatField15(3.5F)
+                        .setInt32Field16(8)
+                        .setBytesField17(ByteString.copyFrom(new byte[] {10, 
11, 12}))
+                        .setBoolField18(true)
+                        .setStringField19("test3")
+                        .setFloatField20(4.5F)
+                        .setFixed32Field21(1)
+                        .setFixed64Field22(2L)
+                        .setSfixed32Field23(3)
+                        .setSfixed64Field24(4L)
+                        .setDoubleField25(5.5)
+                        .setUint32Field26(6)
+                        .setUint64Field27(7L)
+                        .setBoolField28(true)
+                        .addField29("value1")
+                        .addField29("value2")
+                        .addField29("value3")
+                        .setFloatField30(8.5F)
+                        .setStringField31("test4")
+                        .putMapField32("key1", ByteString.copyFrom(new byte[] 
{13, 14, 15}))
+                        .putMapField32("key2", ByteString.copyFrom(new byte[] 
{16, 17, 18}))
+                        .putMapField33("key1", "value1")
+                        .putMapField33("key2", "value2")
+                        .build();
+
+        RowData row =
+                ProtobufTestHelper.pbBytesToRow(
+                        BigPbClass.BigPbMessage.class, 
bigPbMessage.toByteArray());
+
+        assertEquals(5, row.getInt(0));
+        assertFalse(row.getBoolean(1));
+        assertEquals("test1", row.getString(2).toString());
+        assertArrayEquals(new byte[] {1, 2, 3}, row.getBinary(3));
+        assertEquals(2.5, row.getDouble(4), 0.0);
+        assertEquals(1.5F, row.getFloat(5), 0.0);
+        assertEquals(3, row.getInt(6));
+        assertEquals(7L, row.getLong(7));
+        assertEquals(9L, row.getLong(8));
+        assertArrayEquals(new byte[] {4, 5, 6}, row.getBinary(9));
+        assertEquals(6.5, row.getDouble(10), 0.0);
+        assertArrayEquals(new byte[] {7, 8, 9}, row.getBinary(11));
+        assertTrue(row.getBoolean(12));
+        assertEquals("test2", row.getString(13).toString());
+        assertEquals(3.5F, row.getFloat(14), 0.0);
+        assertEquals(8, row.getInt(15));
+        assertArrayEquals(new byte[] {10, 11, 12}, row.getBinary(16));
+        assertTrue(row.getBoolean(17));
+        assertEquals("test3", row.getString(18).toString());
+        assertEquals(4.5F, row.getFloat(19), 0.0);
+        assertEquals(1, row.getInt(20));
+        assertEquals(2L, row.getLong(21));
+        assertEquals(3, row.getInt(22));
+        assertEquals(4L, row.getLong(23));
+        assertEquals(5.5, row.getDouble(24), 0.0);
+        assertEquals(6, row.getInt(25));
+        assertEquals(7L, row.getLong(26));
+        assertTrue(row.getBoolean(27));
+        assertEquals("value1", row.getArray(28).getString(0).toString());
+        assertEquals("value2", row.getArray(28).getString(1).toString());
+        assertEquals("value3", row.getArray(28).getString(2).toString());
+        assertEquals(8.5F, row.getFloat(29), 0.0);
+        assertEquals("test4", row.getString(30).toString());
+        assertArrayEquals(new byte[] {13, 14, 15}, 
row.getMap(31).valueArray().getBinary(0));
+        assertArrayEquals(new byte[] {16, 17, 18}, 
row.getMap(31).valueArray().getBinary(1));
+        assertEquals("value1", 
row.getMap(32).valueArray().getString(0).toString());
+        assertEquals("value2", 
row.getMap(32).valueArray().getString(1).toString());
+    }
+
+    @Test
+    public void testSplitInDeserialization() throws Exception {
+        RowType rowType = 
PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor());
+        PbFormatConfig formatConfig =
+                new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), 
false, false, "");
+        PbRowDataDeserializationSchema pbRowDataDeserializationSchema =
+                new PbRowDataDeserializationSchema(
+                        rowType, InternalTypeInfo.of(rowType), formatConfig);
+        pbRowDataDeserializationSchema.open(null);
+        // make sure code is split
+        assertTrue(pbRowDataDeserializationSchema.isCodeSplit());
+    }
+}
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
new file mode 100644
index 00000000000..2c9094db384
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for huge proto definition, which may trigger some special 
optimizations such as code
+ * splitting.
+ */
+public class BigPbRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        GenericRowData rowData = new GenericRowData(33);
+        rowData.setField(0, 20);
+        rowData.setField(1, false);
+        rowData.setField(2, StringData.fromString("test1"));
+        rowData.setField(3, new byte[] {1, 2, 3});
+        rowData.setField(4, 2.5);
+        rowData.setField(5, 1.5F);
+        rowData.setField(6, 3);
+        rowData.setField(7, 7L);
+        rowData.setField(8, 9L);
+        rowData.setField(9, new byte[] {4, 5, 6});
+        rowData.setField(10, 6.5);
+        rowData.setField(11, new byte[] {7, 8, 9});
+        rowData.setField(12, true);
+        rowData.setField(13, StringData.fromString("test2"));
+        rowData.setField(14, 3.5F);
+        rowData.setField(15, 8);
+        rowData.setField(16, new byte[] {10, 11, 12});
+        rowData.setField(17, true);
+        rowData.setField(18, StringData.fromString("test3"));
+        rowData.setField(19, 4.5F);
+        rowData.setField(20, 1);
+        rowData.setField(21, 2L);
+        rowData.setField(22, 3);
+        rowData.setField(23, 4L);
+        rowData.setField(24, 5.5);
+        rowData.setField(25, 6);
+        rowData.setField(26, 7L);
+        rowData.setField(27, true);
+        rowData.setField(
+                28,
+                new GenericArrayData(
+                        new StringData[] {
+                            StringData.fromString("value1"),
+                            StringData.fromString("value2"),
+                            StringData.fromString("value3")
+                        }));
+        rowData.setField(29, 3.5F);
+        rowData.setField(30, StringData.fromString("test4"));
+        rowData.setField(31, null);
+        Map<StringData, StringData> map1 =
+                new HashMap() {
+                    {
+                        put(StringData.fromString("key1"), 
StringData.fromString("value1"));
+                        put(StringData.fromString("key2"), 
StringData.fromString("value2"));
+                        put(StringData.fromString("key3"), 
StringData.fromString("value3"));
+                    }
+                };
+        rowData.setField(32, new GenericMapData(map1));
+
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, 
BigPbClass.BigPbMessage.class);
+
+        BigPbClass.BigPbMessage bigPbMessage = 
BigPbClass.BigPbMessage.parseFrom(bytes);
+        assertEquals(rowData.getField(0), bigPbMessage.getIntField1());
+        assertEquals(rowData.getField(1), bigPbMessage.getBoolField2());
+        assertEquals(rowData.getField(2).toString(), 
bigPbMessage.getStringField3());
+        assertArrayEquals(
+                ((byte[]) rowData.getField(3)), 
bigPbMessage.getBytesField4().toByteArray());
+        assertEquals(rowData.getField(4), bigPbMessage.getDoubleField5());
+        assertEquals(rowData.getField(5), bigPbMessage.getFloatField6());
+        assertEquals(rowData.getField(6), bigPbMessage.getUint32Field7());
+        assertEquals(rowData.getField(7), bigPbMessage.getInt64Field8());
+        assertEquals(rowData.getField(8), bigPbMessage.getUint64Field9());
+        assertArrayEquals(
+                ((byte[]) rowData.getField(9)), 
bigPbMessage.getBytesField10().toByteArray());
+        assertEquals(rowData.getField(10), bigPbMessage.getDoubleField11());
+        assertArrayEquals(
+                ((byte[]) rowData.getField(11)), 
bigPbMessage.getBytesField12().toByteArray());
+        assertEquals(rowData.getField(12), bigPbMessage.getBoolField13());
+        assertEquals(rowData.getField(13).toString(), 
bigPbMessage.getStringField14());
+        assertEquals(rowData.getField(14), bigPbMessage.getFloatField15());
+        assertEquals(rowData.getField(15), bigPbMessage.getInt32Field16());
+        assertArrayEquals(
+                ((byte[]) rowData.getField(16)), 
bigPbMessage.getBytesField17().toByteArray());
+        assertEquals(rowData.getField(17), bigPbMessage.getBoolField18());
+        assertEquals(rowData.getField(18).toString(), 
bigPbMessage.getStringField19());
+        assertEquals(rowData.getField(19), bigPbMessage.getFloatField20());
+        assertEquals(rowData.getField(20), bigPbMessage.getFixed32Field21());
+        assertEquals(rowData.getField(21), bigPbMessage.getFixed64Field22());
+        assertEquals(rowData.getField(22), bigPbMessage.getSfixed32Field23());
+        assertEquals(rowData.getField(23), bigPbMessage.getSfixed64Field24());
+        assertEquals(rowData.getField(24), bigPbMessage.getDoubleField25());
+        assertEquals(rowData.getField(25), bigPbMessage.getUint32Field26());
+        assertEquals(rowData.getField(26), bigPbMessage.getUint64Field27());
+        assertEquals(rowData.getField(27), bigPbMessage.getBoolField28());
+        assertEquals(
+                ((GenericArrayData) 
rowData.getField(28)).getString(0).toString(),
+                bigPbMessage.getField29List().get(0));
+        assertEquals(
+                ((GenericArrayData) 
rowData.getField(28)).getString(1).toString(),
+                bigPbMessage.getField29List().get(1));
+        assertEquals(
+                ((GenericArrayData) 
rowData.getField(28)).getString(2).toString(),
+                bigPbMessage.getField29List().get(2));
+        assertEquals(rowData.getField(29), bigPbMessage.getFloatField30());
+        assertEquals(rowData.getField(30).toString(), 
bigPbMessage.getStringField31());
+
+        ArrayData keySet = rowData.getMap(32).keyArray();
+        ArrayData valueSet = rowData.getMap(32).valueArray();
+        assertEquals(keySet.getString(0).toString(), "key2");
+        assertEquals(keySet.getString(1).toString(), "key3");
+        assertEquals(keySet.getString(2).toString(), "key1");
+        assertEquals(valueSet.getString(0).toString(), "value2");
+        assertEquals(valueSet.getString(1).toString(), "value3");
+        assertEquals(valueSet.getString(2).toString(), "value1");
+    }
+
+    @Test
+    public void testSplitInSerialization() throws Exception {
+        RowType rowType = 
PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor());
+        PbFormatConfig formatConfig =
+                new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), 
false, false, "");
+        PbRowDataSerializationSchema pbRowDataSerializationSchema =
+                new PbRowDataSerializationSchema(rowType, formatConfig);
+        pbRowDataSerializationSchema.open(null);
+        // make sure code is split
+        assertTrue(pbRowDataSerializationSchema.isCodeSplit());
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto 
b/flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto
new file mode 100644
index 00000000000..85207c302f3
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.formats.protobuf.testproto;
+
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "BigPbClass";
+
+message BigPbMessage {
+  int32 int_field_1 = 1;
+  bool bool_field_2 = 2;
+  string string_field_3 = 3;
+  bytes bytes_field_4 = 4;
+  double double_field_5 = 5;
+  float float_field_6 = 6;
+  uint32 uint32_field_7 = 7;
+  int64 int64_field_8 = 8;
+  uint64 uint64_field_9 = 9;
+  bytes bytes_field_10 = 10;
+  double double_field_11 = 11;
+  bytes bytes_field_12 = 12;
+  bool bool_field_13 = 13;
+  string string_field_14 = 14;
+  float float_field_15 = 15;
+  int32 int32_field_16 = 16;
+  bytes bytes_field_17 = 17;
+  bool bool_field_18 = 18;
+  string string_field_19 = 19;
+  float float_field_20 = 20;
+  fixed32 fixed32_field_21 = 21;
+  fixed64 fixed64_field_22 = 22;
+  sfixed32 sfixed32_field_23 = 23;
+  sfixed64 sfixed64_field_24 = 24;
+  double double_field_25 = 25;
+  uint32 uint32_field_26 = 26;
+  uint64 uint64_field_27 = 27;
+  bool bool_field_28 = 28;
+  repeated string field_29 = 29;
+  float float_field_30 = 30;
+  string string_field_31 = 31;
+
+  map<string, bytes> map_field_32 = 32;
+  map<string, string> map_field_33 = 33;
+}


Reply via email to