ljw-hit commented on code in PR #23162:
URL: https://github.com/apache/flink/pull/23162#discussion_r1390343343


##########
flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for below case
+ *
+ * <PRE>
+ * syntax = "proto3";
+ * package org.apache.flink.formats.protobuf.testproto;
+ * option java_package = "org.apache.flink.formats.protobuf.testproto";
+ * option java_outer_classname = "BigPbClass";
+ * import "google/protobuf/descriptor.proto";
+ * message BigPbMessage {
+ * </PRE>
+ *
+ * <p>It is valid proto definition.
+ */
+public class BigPbRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        GenericRowData rowData = new GenericRowData(34);
+        rowData.setField(7, 20);
+        rowData.setField(8, StringData.fromString("test1"));
+        rowData.setField(9, false);
+        rowData.setField(10, 1F);
+        rowData.setField(11, 2D);
+        rowData.setField(12, new byte[] {1, 2, 3});

Review Comment:
   Hmm, thank you for your suggestion, I will fix it.



##########
flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.formats.protobuf.testproto;
+
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "BigPbClass";
+
+message BigPbMessage {
+  repeated string field = 1;
+  int32 int_field = 2;
+  float float_field = 3;
+  double double_field = 4;
+  bool bool_field = 5;
+  string string_field = 6;
+  bytes bytes_field = 7;
+  int32 a_field_1 = 8;
+  string a_field_2 = 9 ;
+  bool a_field_3 = 10;
+  float b_field_1 = 11;
+  double b_field_2 = 12;
+  bytes b_field_3 = 13;
+  int64 c_field_1 = 14;
+  uint32 c_field_2 = 15;
+  uint64 c_field_3 = 16;
+  int32 e_field_1 = 17;
+  float e_field_2 = 18;
+  string e_field_3 = 19;
+  bool e_field_4 = 20;
+  bytes e_field_5 = 21;
+  double f_field_1 = 22;
+  uint32 f_field_2 = 23;
+  uint64 f_field_3 = 24;
+  fixed32 f_field_4 = 25;
+  fixed64 f_field_5 = 26;
+  sfixed32 f_field_6 = 27;
+  sfixed64 f_field_7 = 28;
+  float f_field_8 = 29;
+  bool f_field_9 = 30;
+  string f_field_10 = 31;
+  bytes f_field_11 = 32;

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializer.java:
##########
@@ -28,12 +29,19 @@
  * </PRE>
  */
 public interface PbCodegenDeserializer {
+
     /**
      * @param resultVar the final var name that is calculated by codegen. This 
var name will be used
-     *     by outsider codegen environment. {@code resultVariable} should be 
flink object
+     *     by outsider codegen environment. {@code returnInternalDataVarName} 
should be flink data
+     *     object
      * @param pbObjectCode may be a variable or expression. Current codegen 
environment can use this
-     *     literal name directly to access the input. {@code pbObject} should 
be a protobuf object
+     *     literal name directly to access the input. {@code pbGetStr} is a 
value coming from
+     *     protobuf object
+     * @param pbCodeSplitter when encode/decode method body over 4K, use 
PbCodeSplitter to Split

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodeSplitter.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenVarId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PbCodeSplitter to split the serialization and deserialization code, for 
RowType each
+ * element,codeSpiltter will merge code segments of multiple elements, and 
split the code segment
+ * into a method if the code segment exceeds the threshold, and store the 
method in the
+ * splitMethodStack.
+ */
+public class PbCodeSplitter {
+    private final List<String> splitMethodStack = new ArrayList<>();
+
+    public PbCodeSplitter() {}

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java:
##########
@@ -203,26 +204,29 @@ public static String pbDefaultValueCode(
     }
 
     /**
-     * This method will be called from serializer of flink array/map type 
because flink contains
-     * both array/map type in array format. Map/Array cannot contain null 
value in pb object then we
-     * must do conversion in case of null values in map/array type.
+     * Used to split array/map type java code segment This method will be 
called from serializer of
+     * flink array/map type because flink contains both array/map type in 
array format. Map/Array
+     * cannot contain null value in pb object then we must do conversion in 
case of null values in
+     * map/array type.
      *
      * @param flinkArrDataVar code phrase represent arrayData of arr type or 
keyData/valueData in
      *     map type.
      * @param iVar the index in arrDataVar
      * @param resultPbVar the returned pb variable name in codegen.
      * @param elementPbFd {@link FieldDescriptor} of element type in proto 
object
      * @param elementDataType {@link LogicalType} of element type in flink 
object
+     * @param pbCodeSplitter protobuf code splitter
      * @return The java code segment which represents field value retrieval.
      */
-    public static String convertFlinkArrayElementToPbWithDefaultValueCode(
+    public static String convertFlinkArrayElementToPbWithDefaultValueCodeSplit(

Review Comment:
   resolved it



##########
flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for below case
+ *
+ * <PRE>
+ * syntax = "proto3";
+ * package org.apache.flink.formats.protobuf.testproto;
+ * option java_package = "org.apache.flink.formats.protobuf.testproto";
+ * option java_outer_classname = "BigPbClass";
+ * import "google/protobuf/descriptor.proto";
+ * message BigPbMessage {
+ * </PRE>
+ *
+ * <p>It is valid proto definition.
+ */
+public class BigPbRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        GenericRowData rowData = new GenericRowData(34);
+        rowData.setField(7, 20);
+        rowData.setField(8, StringData.fromString("test1"));
+        rowData.setField(9, false);
+        rowData.setField(10, 1F);
+        rowData.setField(11, 2D);
+        rowData.setField(12, new byte[] {1, 2, 3});
+
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, 
BigPbClass.BigPbMessage.class);
+
+        BigPbClass.BigPbMessage bigPbMessage = 
BigPbClass.BigPbMessage.parseFrom(bytes);
+
+        assertEquals(20, bigPbMessage.getAField1());
+        assertEquals("test1", bigPbMessage.getAField2());
+        assertFalse(bigPbMessage.getAField3());
+        assertEquals(1F, bigPbMessage.getBField1());
+        assertEquals(2D, bigPbMessage.getBField2());
+        assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}), 
bigPbMessage.getBField3());
+    }
+
+    /*
+     * Flink-Protobuf serialize codegen code size is 13999, over code 
threshold.
+     * So pbCodeSplitter split the code.
+     */
+    @Test
+    public void testSerializeSplit() throws Exception {

Review Comment:
   Hmm, thanks for the suggestion, I will adopt it



##########
flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for below case
+ *
+ * <PRE>
+ * syntax = "proto3";
+ * package org.apache.flink.formats.protobuf.testproto;
+ * option java_package = "org.apache.flink.formats.protobuf.testproto";
+ * option java_outer_classname = "BigPbClass";
+ * import "google/protobuf/descriptor.proto";
+ * message BigPbMessage {
+ * </PRE>
+ *
+ * <p>It is valid proto definition.

Review Comment:
   thank you for your suggestion, I will modify my comment.



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java:
##########
@@ -41,8 +42,8 @@ public PbCodegenSimpleSerializer(
         this.formatContext = formatContext;
     }
 
-    @Override
-    public String codegen(String resultVar, String flinkObjectCode, int indent)
+    public String codegenSplit(

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java:
##########
@@ -203,26 +204,29 @@ public static String pbDefaultValueCode(
     }
 
     /**
-     * This method will be called from serializer of flink array/map type 
because flink contains
-     * both array/map type in array format. Map/Array cannot contain null 
value in pb object then we
-     * must do conversion in case of null values in map/array type.
+     * Used to split array/map type java code segment This method will be 
called from serializer of

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Test class for below case
+ *
+ * <PRE>
+ * syntax = "proto3";
+ * package org.apache.flink.formats.protobuf.testproto;
+ * option java_package = "org.apache.flink.formats.protobuf.testproto";
+ * option java_outer_classname = "BigPbClass";
+ * import "google/protobuf/descriptor.proto";
+ * message BigPbMessage {
+ * </PRE>
+ *
+ * <p>It is valid proto definition.
+ */
+public class BigPbProtoToRowTest {

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializer.java:
##########
@@ -28,12 +29,19 @@
  * </PRE>
  */
 public interface PbCodegenDeserializer {
+
     /**
      * @param resultVar the final var name that is calculated by codegen. This 
var name will be used
-     *     by outsider codegen environment. {@code resultVariable} should be 
flink object
+     *     by outsider codegen environment. {@code returnInternalDataVarName} 
should be flink data
+     *     object
      * @param pbObjectCode may be a variable or expression. Current codegen 
environment can use this
-     *     literal name directly to access the input. {@code pbObject} should 
be a protobuf object
+     *     literal name directly to access the input. {@code pbGetStr} is a 
value coming from

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.formats.protobuf.testproto.BigPbClass;
+import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for below case
+ *
+ * <PRE>
+ * syntax = "proto3";
+ * package org.apache.flink.formats.protobuf.testproto;
+ * option java_package = "org.apache.flink.formats.protobuf.testproto";
+ * option java_outer_classname = "BigPbClass";
+ * import "google/protobuf/descriptor.proto";
+ * message BigPbMessage {
+ * </PRE>
+ *
+ * <p>It is valid proto definition.
+ */
+public class BigPbRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        GenericRowData rowData = new GenericRowData(34);
+        rowData.setField(7, 20);
+        rowData.setField(8, StringData.fromString("test1"));
+        rowData.setField(9, false);
+        rowData.setField(10, 1F);
+        rowData.setField(11, 2D);
+        rowData.setField(12, new byte[] {1, 2, 3});
+
+        byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, 
BigPbClass.BigPbMessage.class);
+
+        BigPbClass.BigPbMessage bigPbMessage = 
BigPbClass.BigPbMessage.parseFrom(bytes);
+
+        assertEquals(20, bigPbMessage.getAField1());
+        assertEquals("test1", bigPbMessage.getAField2());
+        assertFalse(bigPbMessage.getAField3());
+        assertEquals(1F, bigPbMessage.getBField1());
+        assertEquals(2D, bigPbMessage.getBField2());
+        assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}), 
bigPbMessage.getBField3());
+    }
+
+    /*
+     * Flink-Protobuf serialize codegen code size is 13999, over code 
threshold.
+     * So pbCodeSplitter split the code.

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java:
##########
@@ -104,10 +106,19 @@ public ProtoToRowConverter(RowType rowType, 
PbFormatConfig formatConfig)
             PbCodegenDeserializer codegenDes =
                     PbCodegenDeserializeFactory.getPbCodegenTopRowDes(
                             descriptor, rowType, pbFormatContext);
-            String genCode = codegenDes.codegen("rowData", "message", 0);
+            // if codgen generate code size over threshod then split the code
+            PbCodeSplitter pbCodeSplitter = new PbCodeSplitter();
+            LOG.info("Fast-pb generate split deserialize code");

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java:
##########
@@ -129,4 +140,8 @@ public RowData convertProtoBinaryToRow(byte[] data) throws 
Exception {
         Object messageObj = parseFromMethod.invoke(null, data);
         return (RowData) decodeMethod.invoke(null, messageObj);
     }
+
+    public boolean isCodeSplit() {

Review Comment:
   resolved



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java:
##########
@@ -27,4 +27,10 @@ public class PbConstant {
     public static final String PB_MAP_KEY_NAME = "key";
     public static final String PB_MAP_VALUE_NAME = "value";
     public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
+    /**
+     * JIT optimizer threshold is 8K, unicode encode one char use 2byte, so 
use 3K as

Review Comment:
   resolvedHmm, thank you for your suggestion, I will modify my comment. By the 
way, if 1 character corresponds to 1 byte, does this threshold need to be 
modified?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodeSplitter.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenVarId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PbCodeSplitter to split the serialization and deserialization code, for 
RowType each
+ * element,codeSpiltter will merge code segments of multiple elements, and 
split the code segment
+ * into a method if the code segment exceeds the threshold, and store the 
method in the
+ * splitMethodStack.
+ */
+public class PbCodeSplitter {
+    private final List<String> splitMethodStack = new ArrayList<>();
+
+    public PbCodeSplitter() {}
+
+    public String splitDeserializerRowTypeMethod(
+            String rowDataVar, String messageTypeStr, String messageTypeVar, 
String code) {
+        int uid = PbCodegenVarId.getInstance().getAndIncrement();
+        String splitMethodName = "split" + uid;
+        PbCodegenAppender pbCodegenAppender = new PbCodegenAppender();
+        pbCodegenAppender.appendSegment(
+                String.format(
+                        "private static void %s (GenericRowData %s, %s %s) {\n 
%s \n}",
+                        splitMethodName, rowDataVar, messageTypeStr, 
messageTypeVar, code));
+        splitMethodStack.add(pbCodegenAppender.code());
+        return String.format("%s(%s, %s);", splitMethodName, rowDataVar, 
messageTypeVar);
+    }
+
+    public String splitSerializerRowTypeMethod(

Review Comment:
   Thank you for such targeted suggestions. 
   1. I have found a way to reuse code, I can solve this part.
   2.Regarding the second point, I am a little confused. Do you mean that 
pbCodeSplitter is not needed? Put all codeSplit logic into pbFomartContext?



##########
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSerializer.java:
##########
@@ -28,13 +29,18 @@
  * </PRE>
  */
 public interface PbCodegenSerializer {
+
     /**
      * @param resultVar the final var name that is calculated by codegen. This 
var name will be used
-     *     by outsider codegen environment. {@code resultVariable} should be 
protobuf object
-     * @param flinkObjectCode may be a variable or expression. Current codegen 
environment can use
-     *     this literal name directly to access the input. {@code flinkObject} 
should be a flink
-     *     internal object.
+     *     by outsider codegen environment. {@code returnPbVarName} should be 
protobuf object
+     * @param internalDataGetStr may be a variable or expression. Current 
codegen environment can
+     *     use this literal name directly to access the input. {@code 
internalDataGetStr} is a value
+     *     coming from flink object.
+     * @param pbCodeSplitter when encode/decode method body over 4K, use 
PbCodeSplitter to Split

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to