This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a2ec4c3b8bd [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate huge method problems a2ec4c3b8bd is described below commit a2ec4c3b8bd9f2009fd721ab4c51afedcd8574fb Author: lijingwei.5018 <lijingwei.5...@bytedance.com> AuthorDate: Mon Nov 13 11:32:43 2023 +0800 [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate huge method problems Close apache/flink#23162 --- .../apache/flink/formats/protobuf/PbConstant.java | 6 + .../flink/formats/protobuf/PbFormatContext.java | 44 ++++++ .../deserialize/PbCodegenRowDeserializer.java | 30 +++- .../PbRowDataDeserializationSchema.java | 6 + .../protobuf/deserialize/ProtoToRowConverter.java | 13 ++ .../protobuf/serialize/PbCodegenRowSerializer.java | 37 ++++- .../serialize/PbRowDataSerializationSchema.java | 6 + .../protobuf/serialize/RowToProtoConverter.java | 13 ++ .../formats/protobuf/util/PbCodegenUtils.java | 4 + .../formats/protobuf/BigPbProtoToRowTest.java | 140 +++++++++++++++++ .../formats/protobuf/BigPbRowToProtoTest.java | 166 +++++++++++++++++++++ .../src/test/proto/test_big_pb.proto | 61 ++++++++ 12 files changed, 512 insertions(+), 14 deletions(-) diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java index ea7d6514c56..4a39794e68d 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java @@ -27,4 +27,10 @@ public class PbConstant { public static final String PB_MAP_KEY_NAME = "key"; public static final String PB_MAP_VALUE_NAME = "value"; public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass"; + /** + * JIT optimizer threshold is 8K, unicode encode one char use 1byte, so use 4K as + * codegen_spilt_threshold,A conservative threshold is selected to prevent multiple element code + * segments in RowType from being combined to exceed 8K. + */ + public static final int CODEGEN_SPLIT_THRESHOLD = 4000; } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java index 27ceb0fb49d..9302cc274c3 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java @@ -18,14 +18,58 @@ package org.apache.flink.formats.protobuf; +import org.apache.flink.formats.protobuf.util.PbCodegenAppender; +import org.apache.flink.formats.protobuf.util.PbCodegenVarId; + +import java.util.ArrayList; +import java.util.List; + /** store config and common information. */ public class PbFormatContext { private final PbFormatConfig pbFormatConfig; + private final List<String> splitMethodStack = new ArrayList<>(); public PbFormatContext(PbFormatConfig pbFormatConfig) { this.pbFormatConfig = pbFormatConfig; } + private String createSplitMethod( + String rowDataType, + String rowDataVar, + String messageTypeStr, + String messageTypeVar, + String code) { + int uid = PbCodegenVarId.getInstance().getAndIncrement(); + String splitMethodName = "split" + uid; + PbCodegenAppender pbCodegenAppender = new PbCodegenAppender(); + pbCodegenAppender.appendSegment( + String.format( + "private static void %s (%s %s, %s %s) {\n %s \n}", + splitMethodName, + rowDataType, + rowDataVar, + messageTypeStr, + messageTypeVar, + code)); + splitMethodStack.add(pbCodegenAppender.code()); + return String.format("%s(%s, %s);", splitMethodName, rowDataVar, messageTypeVar); + } + + public String splitDeserializerRowTypeMethod( + String rowDataVar, String messageTypeStr, String messageTypeVar, String code) { + return createSplitMethod( + "GenericRowData", rowDataVar, messageTypeStr, messageTypeVar, code); + } + + public String splitSerializerRowTypeMethod( + String rowDataVar, String messageTypeStr, String messageTypeVar, String code) { + return createSplitMethod("RowData", rowDataVar, messageTypeStr, messageTypeVar, code); + } + + public List<String> getSplitMethodStack() { + return splitMethodStack; + } + public PbFormatConfig getPbFormatConfig() { return pbFormatConfig; } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java index d1fc0c60726..f3cf414f540 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java @@ -21,6 +21,7 @@ package org.apache.flink.formats.protobuf.deserialize; import org.apache.flink.formats.protobuf.PbCodegenException; import org.apache.flink.formats.protobuf.PbFormatContext; import org.apache.flink.formats.protobuf.util.PbCodegenAppender; +import org.apache.flink.formats.protobuf.util.PbCodegenUtils; import org.apache.flink.formats.protobuf.util.PbCodegenVarId; import org.apache.flink.formats.protobuf.util.PbFormatUtils; import org.apache.flink.table.types.logical.LogicalType; @@ -59,6 +60,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer { appender.appendLine( "GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")"); int index = 0; + PbCodegenAppender splitAppender = new PbCodegenAppender(indent); for (String fieldName : rowType.getFieldNames()) { int subUid = varUid.getAndIncrement(); String flinkRowEleVar = "elementDataVar" + subUid; @@ -68,7 +70,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer { String strongCamelFieldName = PbFormatUtils.getStrongCamelCaseJsonName(fieldName); PbCodegenDeserializer codegen = PbCodegenDeserializeFactory.getPbCodegenDes(elementFd, subType, formatContext); - appender.appendLine("Object " + flinkRowEleVar + " = null"); + splitAppender.appendLine("Object " + flinkRowEleVar + " = null"); if (!formatContext.getPbFormatConfig().isReadDefaultValues()) { // only works in syntax=proto2 and readDefaultValues=false // readDefaultValues must be true in pb3 mode @@ -77,7 +79,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer { pbMessageVar, strongCamelFieldName, PbFormatUtils.isRepeatedType(subType)); - appender.begin("if(" + isMessageElementNonEmptyCode + "){"); + splitAppender.begin("if(" + isMessageElementNonEmptyCode + "){"); } String pbGetMessageElementCode = pbGetMessageElementCode( @@ -87,15 +89,31 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer { PbFormatUtils.isArrayType(subType)); String code = codegen.codegen( - flinkRowEleVar, pbGetMessageElementCode, appender.currentIndent()); - appender.appendSegment(code); + flinkRowEleVar, pbGetMessageElementCode, splitAppender.currentIndent()); + splitAppender.appendSegment(code); if (!formatContext.getPbFormatConfig().isReadDefaultValues()) { - appender.end("}"); + splitAppender.end("}"); } - appender.appendLine( + splitAppender.appendLine( flinkRowDataVar + ".setField(" + index + ", " + flinkRowEleVar + ")"); + if (PbCodegenUtils.needToSplit(splitAppender.code().length())) { + String splitMethod = + formatContext.splitDeserializerRowTypeMethod( + flinkRowDataVar, + pbMessageTypeStr, + pbMessageVar, + splitAppender.code()); + appender.appendSegment(splitMethod); + splitAppender = new PbCodegenAppender(); + } index += 1; } + if (!splitAppender.code().isEmpty()) { + String splitMethod = + formatContext.splitDeserializerRowTypeMethod( + flinkRowDataVar, pbMessageTypeStr, pbMessageVar, splitAppender.code()); + appender.appendSegment(splitMethod); + } appender.appendLine(resultVar + " = " + flinkRowDataVar); return appender.code(); } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java index 5d3872713d2..4225c6390c6 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.protobuf.deserialize; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.protobuf.PbFormatConfig; @@ -76,6 +77,11 @@ public class PbRowDataDeserializationSchema implements DeserializationSchema<Row } } + @VisibleForTesting + public boolean isCodeSplit() { + return protoToRowConverter.isCodeSplit(); + } + @Override public boolean isEndOfStream(RowData nextElement) { return false; diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java index 4564efce2a4..2c132e892fb 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.protobuf.deserialize; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.formats.protobuf.PbCodegenException; import org.apache.flink.formats.protobuf.PbConstant; import org.apache.flink.formats.protobuf.PbFormatConfig; @@ -54,6 +55,7 @@ public class ProtoToRowConverter { private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class); private final Method parseFromMethod; private final Method decodeMethod; + private boolean isCodeSplit = false; public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig) throws PbCodegenException { @@ -108,6 +110,12 @@ public class ProtoToRowConverter { codegenAppender.appendSegment(genCode); codegenAppender.appendLine("return rowData"); codegenAppender.appendSegment("}"); + if (!pbFormatContext.getSplitMethodStack().isEmpty()) { + isCodeSplit = true; + for (String splitMethod : pbFormatContext.getSplitMethodStack()) { + codegenAppender.appendSegment(splitMethod); + } + } codegenAppender.appendSegment("}"); String printCode = codegenAppender.printWithLineNumber(); @@ -129,4 +137,9 @@ public class ProtoToRowConverter { Object messageObj = parseFromMethod.invoke(null, data); return (RowData) decodeMethod.invoke(null, messageObj); } + + @VisibleForTesting + protected boolean isCodeSplit() { + return isCodeSplit; + } } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java index 083a628b70d..342117cd272 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java @@ -63,6 +63,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { + pbMessageTypeStr + ".newBuilder()"); int index = 0; + PbCodegenAppender splitAppender = new PbCodegenAppender(indent); for (String fieldName : rowType.getFieldNames()) { Descriptors.FieldDescriptor elementFd = descriptor.findFieldByName(fieldName); LogicalType subType = rowType.getTypeAt(rowType.getFieldIndex(fieldName)); @@ -80,17 +81,18 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { // Only set non-null element of flink row to proto object. The real value in proto // result depends on protobuf implementation. - appender.begin("if(!" + flinkRowDataVar + ".isNullAt(" + index + ")){"); - appender.appendLine(elementPbTypeStr + " " + elementPbVar); + splitAppender.begin("if(!" + flinkRowDataVar + ".isNullAt(" + index + ")){"); + splitAppender.appendLine(elementPbTypeStr + " " + elementPbVar); String flinkRowElementCode = PbCodegenUtils.flinkContainerElementCode(flinkRowDataVar, index + "", subType); PbCodegenSerializer codegen = PbCodegenSerializeFactory.getPbCodegenSer(elementFd, subType, formatContext); String code = - codegen.codegen(elementPbVar, flinkRowElementCode, appender.currentIndent()); - appender.appendSegment(code); + codegen.codegen( + elementPbVar, flinkRowElementCode, splitAppender.currentIndent()); + splitAppender.appendSegment(code); if (subType.getTypeRoot() == LogicalTypeRoot.ARRAY) { - appender.appendLine( + splitAppender.appendLine( messageBuilderVar + ".addAll" + strongCamelFieldName @@ -98,7 +100,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { + elementPbVar + ")"); } else if (subType.getTypeRoot() == LogicalTypeRoot.MAP) { - appender.appendLine( + splitAppender.appendLine( messageBuilderVar + ".putAll" + strongCamelFieldName @@ -106,7 +108,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { + elementPbVar + ")"); } else { - appender.appendLine( + splitAppender.appendLine( messageBuilderVar + ".set" + strongCamelFieldName @@ -114,9 +116,28 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { + elementPbVar + ")"); } - appender.end("}"); + splitAppender.end("}"); + if (PbCodegenUtils.needToSplit(splitAppender.code().length())) { + String splitMethod = + formatContext.splitSerializerRowTypeMethod( + flinkRowDataVar, + pbMessageTypeStr + ".Builder", + messageBuilderVar, + splitAppender.code()); + appender.appendSegment(splitMethod); + splitAppender = new PbCodegenAppender(); + } index += 1; } + if (!splitAppender.code().isEmpty()) { + String splitMethod = + formatContext.splitSerializerRowTypeMethod( + flinkRowDataVar, + pbMessageTypeStr + ".Builder", + messageBuilderVar, + splitAppender.code()); + appender.appendSegment(splitMethod); + } appender.appendLine(resultVar + " = " + messageBuilderVar + ".build()"); return appender.code(); } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java index 24484dd71b4..b268e148f8e 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.protobuf.serialize; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.formats.protobuf.PbFormatConfig; import org.apache.flink.formats.protobuf.util.PbFormatUtils; @@ -55,6 +56,11 @@ public class PbRowDataSerializationSchema implements SerializationSchema<RowData rowToProtoConverter = new RowToProtoConverter(rowType, pbFormatConfig); } + @VisibleForTesting + public boolean isCodeSplit() { + return rowToProtoConverter.isCodeSplit(); + } + @Override public byte[] serialize(RowData element) { try { diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java index df701a515a2..d7a3ef48e41 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.protobuf.serialize; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.formats.protobuf.PbCodegenException; import org.apache.flink.formats.protobuf.PbConstant; import org.apache.flink.formats.protobuf.PbFormatConfig; @@ -51,6 +52,7 @@ import java.util.UUID; public class RowToProtoConverter { private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class); private final Method encodeMethod; + private boolean isCodeSplit = false; public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig) throws PbCodegenException { @@ -89,6 +91,12 @@ public class RowToProtoConverter { codegenAppender.appendSegment(genCode); codegenAppender.appendLine("return message"); codegenAppender.end("}"); + if (!formatContext.getSplitMethodStack().isEmpty()) { + isCodeSplit = true; + for (String spliteMethod : formatContext.getSplitMethodStack()) { + codegenAppender.appendSegment(spliteMethod); + } + } codegenAppender.end("}"); String printCode = codegenAppender.printWithLineNumber(); @@ -109,4 +117,9 @@ public class RowToProtoConverter { AbstractMessage message = (AbstractMessage) encodeMethod.invoke(null, rowData); return message.toByteArray(); } + + @VisibleForTesting + protected boolean isCodeSplit() { + return isCodeSplit; + } } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java index 042392e302b..87643aac72c 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java @@ -264,4 +264,8 @@ public class PbCodegenUtils { } return simpleCompiler.getClassLoader().loadClass(className); } + + public static boolean needToSplit(int noSplitCodeSize) { + return noSplitCodeSize >= PbConstant.CODEGEN_SPLIT_THRESHOLD; + } } diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java new file mode 100644 index 00000000000..ad0ee2efae3 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.ByteString; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Test for huge proto definition, which may trigger some special optimizations such as code + * splitting. + */ +public class BigPbProtoToRowTest { + + @Test + public void testSimple() throws Exception { + BigPbClass.BigPbMessage bigPbMessage = + BigPbClass.BigPbMessage.newBuilder() + .setIntField1(5) + .setBoolField2(false) + .setStringField3("test1") + .setBytesField4(ByteString.copyFrom(new byte[] {1, 2, 3})) + .setDoubleField5(2.5) + .setFloatField6(1.5F) + .setUint32Field7(3) + .setInt64Field8(7L) + .setUint64Field9(9L) + .setBytesField10(ByteString.copyFrom(new byte[] {4, 5, 6})) + .setDoubleField11(6.5) + .setBytesField12(ByteString.copyFrom(new byte[] {7, 8, 9})) + .setBoolField13(true) + .setStringField14("test2") + .setFloatField15(3.5F) + .setInt32Field16(8) + .setBytesField17(ByteString.copyFrom(new byte[] {10, 11, 12})) + .setBoolField18(true) + .setStringField19("test3") + .setFloatField20(4.5F) + .setFixed32Field21(1) + .setFixed64Field22(2L) + .setSfixed32Field23(3) + .setSfixed64Field24(4L) + .setDoubleField25(5.5) + .setUint32Field26(6) + .setUint64Field27(7L) + .setBoolField28(true) + .addField29("value1") + .addField29("value2") + .addField29("value3") + .setFloatField30(8.5F) + .setStringField31("test4") + .putMapField32("key1", ByteString.copyFrom(new byte[] {13, 14, 15})) + .putMapField32("key2", ByteString.copyFrom(new byte[] {16, 17, 18})) + .putMapField33("key1", "value1") + .putMapField33("key2", "value2") + .build(); + + RowData row = + ProtobufTestHelper.pbBytesToRow( + BigPbClass.BigPbMessage.class, bigPbMessage.toByteArray()); + + assertEquals(5, row.getInt(0)); + assertFalse(row.getBoolean(1)); + assertEquals("test1", row.getString(2).toString()); + assertArrayEquals(new byte[] {1, 2, 3}, row.getBinary(3)); + assertEquals(2.5, row.getDouble(4), 0.0); + assertEquals(1.5F, row.getFloat(5), 0.0); + assertEquals(3, row.getInt(6)); + assertEquals(7L, row.getLong(7)); + assertEquals(9L, row.getLong(8)); + assertArrayEquals(new byte[] {4, 5, 6}, row.getBinary(9)); + assertEquals(6.5, row.getDouble(10), 0.0); + assertArrayEquals(new byte[] {7, 8, 9}, row.getBinary(11)); + assertTrue(row.getBoolean(12)); + assertEquals("test2", row.getString(13).toString()); + assertEquals(3.5F, row.getFloat(14), 0.0); + assertEquals(8, row.getInt(15)); + assertArrayEquals(new byte[] {10, 11, 12}, row.getBinary(16)); + assertTrue(row.getBoolean(17)); + assertEquals("test3", row.getString(18).toString()); + assertEquals(4.5F, row.getFloat(19), 0.0); + assertEquals(1, row.getInt(20)); + assertEquals(2L, row.getLong(21)); + assertEquals(3, row.getInt(22)); + assertEquals(4L, row.getLong(23)); + assertEquals(5.5, row.getDouble(24), 0.0); + assertEquals(6, row.getInt(25)); + assertEquals(7L, row.getLong(26)); + assertTrue(row.getBoolean(27)); + assertEquals("value1", row.getArray(28).getString(0).toString()); + assertEquals("value2", row.getArray(28).getString(1).toString()); + assertEquals("value3", row.getArray(28).getString(2).toString()); + assertEquals(8.5F, row.getFloat(29), 0.0); + assertEquals("test4", row.getString(30).toString()); + assertArrayEquals(new byte[] {13, 14, 15}, row.getMap(31).valueArray().getBinary(0)); + assertArrayEquals(new byte[] {16, 17, 18}, row.getMap(31).valueArray().getBinary(1)); + assertEquals("value1", row.getMap(32).valueArray().getString(0).toString()); + assertEquals("value2", row.getMap(32).valueArray().getString(1).toString()); + } + + @Test + public void testSplitInDeserialization() throws Exception { + RowType rowType = PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor()); + PbFormatConfig formatConfig = + new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), false, false, ""); + PbRowDataDeserializationSchema pbRowDataDeserializationSchema = + new PbRowDataDeserializationSchema( + rowType, InternalTypeInfo.of(rowType), formatConfig); + pbRowDataDeserializationSchema.open(null); + // make sure code is split + assertTrue(pbRowDataDeserializationSchema.isCodeSplit()); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java new file mode 100644 index 00000000000..2c9094db384 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test for huge proto definition, which may trigger some special optimizations such as code + * splitting. + */ +public class BigPbRowToProtoTest { + + @Test + public void testSimple() throws Exception { + GenericRowData rowData = new GenericRowData(33); + rowData.setField(0, 20); + rowData.setField(1, false); + rowData.setField(2, StringData.fromString("test1")); + rowData.setField(3, new byte[] {1, 2, 3}); + rowData.setField(4, 2.5); + rowData.setField(5, 1.5F); + rowData.setField(6, 3); + rowData.setField(7, 7L); + rowData.setField(8, 9L); + rowData.setField(9, new byte[] {4, 5, 6}); + rowData.setField(10, 6.5); + rowData.setField(11, new byte[] {7, 8, 9}); + rowData.setField(12, true); + rowData.setField(13, StringData.fromString("test2")); + rowData.setField(14, 3.5F); + rowData.setField(15, 8); + rowData.setField(16, new byte[] {10, 11, 12}); + rowData.setField(17, true); + rowData.setField(18, StringData.fromString("test3")); + rowData.setField(19, 4.5F); + rowData.setField(20, 1); + rowData.setField(21, 2L); + rowData.setField(22, 3); + rowData.setField(23, 4L); + rowData.setField(24, 5.5); + rowData.setField(25, 6); + rowData.setField(26, 7L); + rowData.setField(27, true); + rowData.setField( + 28, + new GenericArrayData( + new StringData[] { + StringData.fromString("value1"), + StringData.fromString("value2"), + StringData.fromString("value3") + })); + rowData.setField(29, 3.5F); + rowData.setField(30, StringData.fromString("test4")); + rowData.setField(31, null); + Map<StringData, StringData> map1 = + new HashMap() { + { + put(StringData.fromString("key1"), StringData.fromString("value1")); + put(StringData.fromString("key2"), StringData.fromString("value2")); + put(StringData.fromString("key3"), StringData.fromString("value3")); + } + }; + rowData.setField(32, new GenericMapData(map1)); + + byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, BigPbClass.BigPbMessage.class); + + BigPbClass.BigPbMessage bigPbMessage = BigPbClass.BigPbMessage.parseFrom(bytes); + assertEquals(rowData.getField(0), bigPbMessage.getIntField1()); + assertEquals(rowData.getField(1), bigPbMessage.getBoolField2()); + assertEquals(rowData.getField(2).toString(), bigPbMessage.getStringField3()); + assertArrayEquals( + ((byte[]) rowData.getField(3)), bigPbMessage.getBytesField4().toByteArray()); + assertEquals(rowData.getField(4), bigPbMessage.getDoubleField5()); + assertEquals(rowData.getField(5), bigPbMessage.getFloatField6()); + assertEquals(rowData.getField(6), bigPbMessage.getUint32Field7()); + assertEquals(rowData.getField(7), bigPbMessage.getInt64Field8()); + assertEquals(rowData.getField(8), bigPbMessage.getUint64Field9()); + assertArrayEquals( + ((byte[]) rowData.getField(9)), bigPbMessage.getBytesField10().toByteArray()); + assertEquals(rowData.getField(10), bigPbMessage.getDoubleField11()); + assertArrayEquals( + ((byte[]) rowData.getField(11)), bigPbMessage.getBytesField12().toByteArray()); + assertEquals(rowData.getField(12), bigPbMessage.getBoolField13()); + assertEquals(rowData.getField(13).toString(), bigPbMessage.getStringField14()); + assertEquals(rowData.getField(14), bigPbMessage.getFloatField15()); + assertEquals(rowData.getField(15), bigPbMessage.getInt32Field16()); + assertArrayEquals( + ((byte[]) rowData.getField(16)), bigPbMessage.getBytesField17().toByteArray()); + assertEquals(rowData.getField(17), bigPbMessage.getBoolField18()); + assertEquals(rowData.getField(18).toString(), bigPbMessage.getStringField19()); + assertEquals(rowData.getField(19), bigPbMessage.getFloatField20()); + assertEquals(rowData.getField(20), bigPbMessage.getFixed32Field21()); + assertEquals(rowData.getField(21), bigPbMessage.getFixed64Field22()); + assertEquals(rowData.getField(22), bigPbMessage.getSfixed32Field23()); + assertEquals(rowData.getField(23), bigPbMessage.getSfixed64Field24()); + assertEquals(rowData.getField(24), bigPbMessage.getDoubleField25()); + assertEquals(rowData.getField(25), bigPbMessage.getUint32Field26()); + assertEquals(rowData.getField(26), bigPbMessage.getUint64Field27()); + assertEquals(rowData.getField(27), bigPbMessage.getBoolField28()); + assertEquals( + ((GenericArrayData) rowData.getField(28)).getString(0).toString(), + bigPbMessage.getField29List().get(0)); + assertEquals( + ((GenericArrayData) rowData.getField(28)).getString(1).toString(), + bigPbMessage.getField29List().get(1)); + assertEquals( + ((GenericArrayData) rowData.getField(28)).getString(2).toString(), + bigPbMessage.getField29List().get(2)); + assertEquals(rowData.getField(29), bigPbMessage.getFloatField30()); + assertEquals(rowData.getField(30).toString(), bigPbMessage.getStringField31()); + + ArrayData keySet = rowData.getMap(32).keyArray(); + ArrayData valueSet = rowData.getMap(32).valueArray(); + assertEquals(keySet.getString(0).toString(), "key2"); + assertEquals(keySet.getString(1).toString(), "key3"); + assertEquals(keySet.getString(2).toString(), "key1"); + assertEquals(valueSet.getString(0).toString(), "value2"); + assertEquals(valueSet.getString(1).toString(), "value3"); + assertEquals(valueSet.getString(2).toString(), "value1"); + } + + @Test + public void testSplitInSerialization() throws Exception { + RowType rowType = PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor()); + PbFormatConfig formatConfig = + new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), false, false, ""); + PbRowDataSerializationSchema pbRowDataSerializationSchema = + new PbRowDataSerializationSchema(rowType, formatConfig); + pbRowDataSerializationSchema.open(null); + // make sure code is split + assertTrue(pbRowDataSerializationSchema.isCodeSplit()); + } +} diff --git a/flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto b/flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto new file mode 100644 index 00000000000..85207c302f3 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.formats.protobuf.testproto; + +option java_package = "org.apache.flink.formats.protobuf.testproto"; +option java_outer_classname = "BigPbClass"; + +message BigPbMessage { + int32 int_field_1 = 1; + bool bool_field_2 = 2; + string string_field_3 = 3; + bytes bytes_field_4 = 4; + double double_field_5 = 5; + float float_field_6 = 6; + uint32 uint32_field_7 = 7; + int64 int64_field_8 = 8; + uint64 uint64_field_9 = 9; + bytes bytes_field_10 = 10; + double double_field_11 = 11; + bytes bytes_field_12 = 12; + bool bool_field_13 = 13; + string string_field_14 = 14; + float float_field_15 = 15; + int32 int32_field_16 = 16; + bytes bytes_field_17 = 17; + bool bool_field_18 = 18; + string string_field_19 = 19; + float float_field_20 = 20; + fixed32 fixed32_field_21 = 21; + fixed64 fixed64_field_22 = 22; + sfixed32 sfixed32_field_23 = 23; + sfixed64 sfixed64_field_24 = 24; + double double_field_25 = 25; + uint32 uint32_field_26 = 26; + uint64 uint64_field_27 = 27; + bool bool_field_28 = 28; + repeated string field_29 = 29; + float float_field_30 = 30; + string string_field_31 = 31; + + map<string, bytes> map_field_32 = 32; + map<string, string> map_field_33 = 33; +}