This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7ea4476c054 [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type 7ea4476c054 is described below commit 7ea4476c0544e17798cbb1e39609827954f6c266 Author: laughingman7743 <laughingman7...@gmail.com> AuthorDate: Wed Jan 4 23:12:19 2023 +0900 [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type Close #21613 --- .../docs/connectors/table/formats/protobuf.md | 5 ++ flink-formats/flink-protobuf/pom.xml | 1 + .../apache/flink/formats/protobuf/PbConstant.java | 1 + .../flink/formats/protobuf/PbFormatContext.java | 8 +-- .../deserialize/PbCodegenArrayDeserializer.java | 3 +- .../deserialize/PbCodegenMapDeserializer.java | 6 +- .../deserialize/PbCodegenRowDeserializer.java | 3 +- .../protobuf/deserialize/ProtoToRowConverter.java | 6 +- .../serialize/PbCodegenArraySerializer.java | 3 +- .../protobuf/serialize/PbCodegenMapSerializer.java | 6 +- .../protobuf/serialize/PbCodegenRowSerializer.java | 11 +--- .../serialize/PbCodegenSimpleSerializer.java | 8 +-- .../protobuf/serialize/RowToProtoConverter.java | 4 +- .../formats/protobuf/util/PbCodegenUtils.java | 19 +++---- .../flink/formats/protobuf/util/PbFormatUtils.java | 58 +++++++++++++++----- ...RowTest.java => MetaNoMultiProtoToRowTest.java} | 18 ++---- .../formats/protobuf/MetaOuterNoMultiTest.java | 4 +- .../protobuf/SameOuterClassNameProtoToRowTest.java | 61 +++++++++++++++++++++ .../protobuf/SameOuterClassNameRowToProtoTest.java | 56 +++++++++++++++++++ .../formats/protobuf/SimpleProtoToRowTest.java | 45 +++++++++------ .../formats/protobuf/SimpleRowToProtoTest.java | 49 ++++++++++------- .../protobuf/TimestampMultiProtoToRowTest.java | 46 ++++++++++++++++ .../protobuf/TimestampMultiRowToProtoTest.java} | 32 ++++++----- .../protobuf/TimestampNoMultiProtoToRowTest.java | 47 ++++++++++++++++ .../protobuf/TimestampNoMultiRowToProtoTest.java | 44 +++++++++++++++ .../TimestampOuterMultiProtoToRowTest.java | 49 +++++++++++++++++ .../TimestampOuterMultiRowToProtoTest.java | 44 +++++++++++++++ .../TimestampOuterNoMultiProtoToRowTest.java | 47 ++++++++++++++++ .../TimestampOuterNoMultiRowToProtoTest.java | 47 ++++++++++++++++ .../flink-protobuf/src/test/proto/test_map.proto | 18 +++--- .../test/proto/test_multiple_level_message.proto | 26 ++++----- .../flink-protobuf/src/test/proto/test_null.proto | 64 +++++++++++----------- .../flink-protobuf/src/test/proto/test_oneof.proto | 8 +-- .../flink-protobuf/src/test/proto/test_pb3.proto | 48 ++++++++-------- .../src/test/proto/test_repeated.proto | 13 ++--- .../src/test/proto/test_repeated_message.proto | 12 ++-- ...neof.proto => test_same_outer_class_name.proto} | 16 +++--- .../{test_simple.proto => test_simple_multi.proto} | 51 +++++++++-------- .../test/proto/test_simple_no_java_package.proto | 40 +++++++------- ...ter_nomulti.proto => test_simple_nomulti.proto} | 30 ++++++---- .../src/test/proto/test_simple_outer_multi.proto | 23 ++++---- .../src/test/proto/test_simple_outer_nomulti.proto | 26 ++++----- ...test_oneof.proto => test_timestamp_multi.proto} | 10 ++-- ...st_oneof.proto => test_timestamp_nomulti.proto} | 12 ++-- ...sage.proto => test_timestamp_outer_multi.proto} | 14 ++--- ...ge.proto => test_timestamp_outer_nomulti.proto} | 16 ++---- 46 files changed, 806 insertions(+), 352 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md b/docs/content.zh/docs/connectors/table/formats/protobuf.md index 5cbafc8d911..b28cf49f9d3 100644 --- a/docs/content.zh/docs/connectors/table/formats/protobuf.md +++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md @@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type. <td><code>enum</code></td> <td>The enum value of protobuf can be mapped to string or number of flink row accordingly.</td> </tr> + <tr> + <td><code>ROW<seconds BIGINT, nanos INT></code></td> + <td><code>google.protobuf.timestamp</code></td> + <td>The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.</td> + </tr> </tbody> </table> diff --git a/flink-formats/flink-protobuf/pom.xml b/flink-formats/flink-protobuf/pom.xml index be38a1a4242..da593b8bc4a 100644 --- a/flink-formats/flink-protobuf/pom.xml +++ b/flink-formats/flink-protobuf/pom.xml @@ -109,6 +109,7 @@ under the License. </goals> <configuration> <protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact> + <includeMavenTypes>direct</includeMavenTypes> <inputDirectories> <include>src/test/proto</include> </inputDirectories> diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java index a886768534d..ea7d6514c56 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java @@ -26,4 +26,5 @@ public class PbConstant { public static final String GENERATED_ENCODE_METHOD = "encode"; public static final String PB_MAP_KEY_NAME = "key"; public static final String PB_MAP_VALUE_NAME = "value"; + public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass"; } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java index 06370f3fa6e..27ceb0fb49d 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java @@ -20,18 +20,12 @@ package org.apache.flink.formats.protobuf; /** store config and common information. */ public class PbFormatContext { - private final String outerPrefix; private final PbFormatConfig pbFormatConfig; - public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) { - this.outerPrefix = outerPrefix; + public PbFormatContext(PbFormatConfig pbFormatConfig) { this.pbFormatConfig = pbFormatConfig; } - public String getOuterPrefix() { - return outerPrefix; - } - public PbFormatConfig getPbFormatConfig() { return pbFormatConfig; } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java index 65bc760b0a3..202322edbbf 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenArrayDeserializer.java @@ -50,8 +50,7 @@ public class PbCodegenArrayDeserializer implements PbCodegenDeserializer { PbCodegenAppender appender = new PbCodegenAppender(indent); PbCodegenVarId varUid = PbCodegenVarId.getInstance(); int uid = varUid.getAndIncrement(); - String protoTypeStr = - PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix()); + String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false); String listPbVar = "list" + uid; String flinkArrVar = "newArr" + uid; String flinkArrEleVar = "subReturnVar" + uid; diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java index 439332e29e7..fed7c84ebba 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenMapDeserializer.java @@ -58,10 +58,8 @@ public class PbCodegenMapDeserializer implements PbCodegenDeserializer { fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME); PbCodegenAppender appender = new PbCodegenAppender(indent); - String pbKeyTypeStr = - PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix()); - String pbValueTypeStr = - PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix()); + String pbKeyTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false); + String pbValueTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false); String pbMapVar = "pbMap" + uid; String pbMapEntryVar = "pbEntry" + uid; String resultDataMapVar = "resultDataMap" + uid; diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java index 3a5b3113166..d1fc0c60726 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java @@ -54,8 +54,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer { String flinkRowDataVar = "rowData" + uid; int fieldSize = rowType.getFieldNames().size(); - String pbMessageTypeStr = - PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix()); + String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor); appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " + pbObjectCode); appender.appendLine( "GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")"); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java index 0ecbbaef19e..4564efce2a4 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java @@ -58,8 +58,6 @@ public class ProtoToRowConverter { public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig) throws PbCodegenException { try { - String outerPrefix = - PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName()); Descriptors.Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName()); Class<?> messageClass = @@ -67,7 +65,7 @@ public class ProtoToRowConverter { formatConfig.getMessageClassName(), true, Thread.currentThread().getContextClassLoader()); - String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix); + String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor); if (descriptor.getFile().getSyntax() == Syntax.PROTO3) { // pb3 always read default values formatConfig = @@ -78,7 +76,7 @@ public class ProtoToRowConverter { formatConfig.getWriteNullStringLiterals()); } PbCodegenAppender codegenAppender = new PbCodegenAppender(); - PbFormatContext pbFormatContext = new PbFormatContext(outerPrefix, formatConfig); + PbFormatContext pbFormatContext = new PbFormatContext(formatConfig); String uuid = UUID.randomUUID().toString().replaceAll("\\-", ""); String generatedClassName = "GeneratedProtoToRow_" + uuid; String generatedPackageName = ProtoToRowConverter.class.getPackage().getName(); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java index 9d242d96e43..0f2f1bcf068 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenArraySerializer.java @@ -50,8 +50,7 @@ public class PbCodegenArraySerializer implements PbCodegenSerializer { PbCodegenVarId varUid = PbCodegenVarId.getInstance(); int uid = varUid.getAndIncrement(); PbCodegenAppender appender = new PbCodegenAppender(indent); - String protoTypeStr = - PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix()); + String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false); String pbListVar = "pbList" + uid; String flinkArrayDataVar = "arrData" + uid; String pbElementVar = "elementPbVar" + uid; diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java index c0c9e383a00..3d223218c0c 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenMapSerializer.java @@ -57,10 +57,8 @@ public class PbCodegenMapSerializer implements PbCodegenSerializer { fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME); PbCodegenAppender appender = new PbCodegenAppender(indent); - String keyProtoTypeStr = - PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix()); - String valueProtoTypeStr = - PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix()); + String keyProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false); + String valueProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false); String flinkKeyArrDataVar = "keyArrData" + uid; String flinkValueArrDataVar = "valueArrData" + uid; diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java index 5f085a779e6..083a628b70d 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java @@ -52,8 +52,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { int uid = varUid.getAndIncrement(); PbCodegenAppender appender = new PbCodegenAppender(indent); String flinkRowDataVar = "rowData" + uid; - String pbMessageTypeStr = - PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix()); + String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor); String messageBuilderVar = "messageBuilder" + uid; appender.appendLine("RowData " + flinkRowDataVar + " = " + flinkObjectCode); appender.appendLine( @@ -71,15 +70,11 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { String elementPbVar = "elementPbVar" + subUid; String elementPbTypeStr; if (elementFd.isMapField()) { - elementPbTypeStr = - PbCodegenUtils.getTypeStrFromProto( - elementFd, false, formatContext.getOuterPrefix()); + elementPbTypeStr = PbCodegenUtils.getTypeStrFromProto(elementFd, false); } else { elementPbTypeStr = PbCodegenUtils.getTypeStrFromProto( - elementFd, - PbFormatUtils.isArrayType(subType), - formatContext.getOuterPrefix()); + elementFd, PbFormatUtils.isArrayType(subType)); } String strongCamelFieldName = PbFormatUtils.getStrongCamelCaseJsonName(fieldName); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java index 29d4bc9ebd8..ccf44f8283c 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java @@ -60,9 +60,7 @@ public class PbCodegenSimpleSerializer implements PbCodegenSerializer { case SMALLINT: case TINYINT: if (fd.getJavaType() == JavaType.ENUM) { - String enumTypeStr = - PbFormatUtils.getFullJavaName( - fd.getEnumType(), formatContext.getOuterPrefix()); + String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType()); appender.appendLine( resultVar + " = " @@ -86,9 +84,7 @@ public class PbCodegenSimpleSerializer implements PbCodegenSerializer { appender.appendLine(fromVar + " = " + flinkObjectCode + ".toString()"); if (fd.getJavaType() == JavaType.ENUM) { String enumValueDescVar = "enumValueDesc" + uid; - String enumTypeStr = - PbFormatUtils.getFullJavaName( - fd.getEnumType(), formatContext.getOuterPrefix()); + String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType()); appender.appendLine( "Descriptors.EnumValueDescriptor " + enumValueDescVar diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java index 349049da02d..df701a515a2 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java @@ -55,11 +55,9 @@ public class RowToProtoConverter { public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig) throws PbCodegenException { try { - String outerPrefix = - PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName()); - PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig); Descriptors.Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName()); + PbFormatContext formatContext = new PbFormatContext(formatConfig); PbCodegenAppender codegenAppender = new PbCodegenAppender(0); String uuid = UUID.randomUUID().toString().replaceAll("\\-", ""); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java index 9cf262ee4be..042392e302b 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java @@ -78,7 +78,7 @@ public class PbCodegenUtils { * @return The returned code phrase will be used as java type str in codegen sections. * @throws PbCodegenException */ - public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, String outerPrefix) + public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList) throws PbCodegenException { String typeStr; switch (fd.getJavaType()) { @@ -90,12 +90,12 @@ public class PbCodegenUtils { FieldDescriptor valueFd = fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME); // key and value cannot be repeated - String keyTypeStr = getTypeStrFromProto(keyFd, false, outerPrefix); - String valueTypeStr = getTypeStrFromProto(valueFd, false, outerPrefix); + String keyTypeStr = getTypeStrFromProto(keyFd, false); + String valueTypeStr = getTypeStrFromProto(valueFd, false); typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">"; } else { // simple message - typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix); + typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType()); } break; case INT: @@ -108,7 +108,7 @@ public class PbCodegenUtils { typeStr = "String"; break; case ENUM: - typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix); + typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType()); break; case FLOAT: typeStr = "Float"; @@ -174,11 +174,10 @@ public class PbCodegenUtils { public static String pbDefaultValueCode( FieldDescriptor fieldDescriptor, PbFormatContext pbFormatContext) throws PbCodegenException { - String outerPrefix = pbFormatContext.getOuterPrefix(); String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals(); switch (fieldDescriptor.getJavaType()) { case MESSAGE: - return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix) + return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType()) + ".getDefaultInstance()"; case INT: return "0"; @@ -187,7 +186,7 @@ public class PbCodegenUtils { case STRING: return "\"" + nullLiteral + "\""; case ENUM: - return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType(), outerPrefix) + return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType()) + ".values()[0]"; case FLOAT: return "0.0f"; @@ -229,9 +228,7 @@ public class PbCodegenUtils { int uid = varUid.getAndIncrement(); String flinkElementVar = "elementVar" + uid; PbCodegenAppender appender = new PbCodegenAppender(indent); - String protoTypeStr = - PbCodegenUtils.getTypeStrFromProto( - elementPbFd, false, pbFormatContext.getOuterPrefix()); + String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(elementPbFd, false); String dataTypeStr = PbCodegenUtils.getTypeStrFromLogicType(elementDataType); appender.appendLine(protoTypeStr + " " + resultPbVar); appender.begin("if(" + flinkArrDataVar + ".isNullAt(" + iVar + ")){"); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java index 84cd35c98cf..1f972bb5752 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbFormatUtils.java @@ -28,25 +28,25 @@ import com.google.protobuf.ProtobufInternalUtils; /** Protobuf function util. */ public class PbFormatUtils { - public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) { + public static String getFullJavaName(Descriptors.Descriptor descriptor) { if (null != descriptor.getContainingType()) { // nested type - String parentJavaFullName = - getFullJavaName(descriptor.getContainingType(), outerProtoName); + String parentJavaFullName = getFullJavaName(descriptor.getContainingType()); return parentJavaFullName + "." + descriptor.getName(); } else { // top level message + String outerProtoName = getOuterProtoPrefix(descriptor.getFile()); return outerProtoName + descriptor.getName(); } } - public static String getFullJavaName( - Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) { + public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) { if (null != enumDescriptor.getContainingType()) { - return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName) + return getFullJavaName(enumDescriptor.getContainingType()) + "." + enumDescriptor.getName(); } else { + String outerProtoName = getOuterProtoPrefix(enumDescriptor.getFile()); return outerProtoName + enumDescriptor.getName(); } } @@ -72,14 +72,46 @@ public class PbFormatUtils { return ProtobufInternalUtils.underScoreToCamelCase(name, true); } - public static String getOuterProtoPrefix(String name) { - name = name.replace('$', '.'); - int index = name.lastIndexOf('.'); - if (index != -1) { - // include dot - return name.substring(0, index + 1); + public static String getOuterClassName(Descriptors.FileDescriptor fileDescriptor) { + if (fileDescriptor.getOptions().hasJavaOuterClassname()) { + return fileDescriptor.getOptions().getJavaOuterClassname(); } else { - return ""; + String[] fileNames = fileDescriptor.getName().split("/"); + String fileName = fileNames[fileNames.length - 1]; + String outerName = getStrongCamelCaseJsonName(fileName.split("\\.")[0]); + // https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation + // The name of the wrapper class is determined by converting the base name of the .proto + // file to camel case if the java_outer_classname option is not specified. + // For example, foo_bar.proto produces the class name FooBar. If there is a service, + // enum, or message (including nested types) in the file with the same name, + // "OuterClass" will be appended to the wrapper class's name. + boolean hasSameNameMessage = + fileDescriptor.getMessageTypes().stream() + .anyMatch(f -> f.getName().equals(outerName)); + boolean hasSameNameEnum = + fileDescriptor.getEnumTypes().stream() + .anyMatch(f -> f.getName().equals(outerName)); + boolean hasSameNameService = + fileDescriptor.getServices().stream() + .anyMatch(f -> f.getName().equals(outerName)); + if (hasSameNameMessage || hasSameNameEnum || hasSameNameService) { + return outerName + PbConstant.PB_OUTER_CLASS_SUFFIX; + } else { + return outerName; + } + } + } + + public static String getOuterProtoPrefix(Descriptors.FileDescriptor fileDescriptor) { + String javaPackageName = + fileDescriptor.getOptions().hasJavaPackage() + ? fileDescriptor.getOptions().getJavaPackage() + : fileDescriptor.getPackage(); + if (fileDescriptor.getOptions().getJavaMultipleFiles()) { + return javaPackageName + "."; + } else { + String outerClassName = getOuterClassName(fileDescriptor); + return javaPackageName + "." + outerClassName + "."; } } diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java similarity index 78% rename from flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java rename to flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java index 32269393e17..bb83169d96c 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoOuterNoMultiProtoToRowTest.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java @@ -20,7 +20,7 @@ package org.apache.flink.formats.protobuf; import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; -import org.apache.flink.formats.protobuf.testproto.TestSimpleNoouterNomulti; +import org.apache.flink.formats.protobuf.testproto.TestSimpleNomulti; import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -39,18 +39,15 @@ import org.junit.Test; * * <p>It is valid proto definition. */ -public class MetaNoOuterNoMultiProtoToRowTest { +public class MetaNoMultiProtoToRowTest { @Test public void testSimple() throws Exception { RowType rowType = PbToRowTypeUtil.generateRowType( - TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.getDescriptor()); + TestSimpleNomulti.SimpleTestNoMulti.getDescriptor()); PbFormatConfig formatConfig = new PbFormatConfig( - TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.class.getName(), - false, - false, - ""); + TestSimpleNomulti.SimpleTestNoMulti.class.getName(), false, false, ""); new PbRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), formatConfig) .open(null); @@ -62,13 +59,10 @@ public class MetaNoOuterNoMultiProtoToRowTest { public void testOuterClassName() throws Exception { RowType rowType = PbToRowTypeUtil.generateRowType( - TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.getDescriptor()); + TestSimpleNomulti.SimpleTestNoMulti.getDescriptor()); PbFormatConfig formatConfig = new PbFormatConfig( - TestSimpleNoouterNomulti.SimpleTestNoouterNomulti.class.getName(), - false, - false, - ""); + TestSimpleNomulti.SimpleTestNoMulti.class.getName(), false, false, ""); new PbRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), formatConfig) .open(null); new PbRowDataSerializationSchema(rowType, formatConfig).open(null); diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java index 7e70a5561b5..afcd3498bb0 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java @@ -45,10 +45,10 @@ public class MetaOuterNoMultiTest { public void testSimple() throws Exception { RowType rowType = PbToRowTypeUtil.generateRowType( - SimpleTestOuterNomultiProto.SimpleTestOuterNomulti.getDescriptor()); + SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.getDescriptor()); PbFormatConfig formatConfig = new PbFormatConfig( - SimpleTestOuterNomultiProto.SimpleTestOuterNomulti.class.getName(), + SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.class.getName(), false, false, ""); diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java new file mode 100644 index 00000000000..b049f2fb907 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass; +import org.apache.flink.table.data.RowData; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test conversion of proto same outer class name data to flink internal data. */ +public class SameOuterClassNameProtoToRowTest { + + @Test + public void testSimple() throws Exception { + TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName = + TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder() + .setA(1) + .setB(TestSameOuterClassNameOuterClass.FooBar.BAR) + .build(); + RowData row = + ProtobufTestHelper.pbBytesToRow( + TestSameOuterClassNameOuterClass.TestSameOuterClassName.class, + testSameOuterClassName.toByteArray()); + + assertEquals(1, row.getInt(0)); + assertEquals("BAR", row.getString(1).toString()); + } + + @Test + public void testIntEnum() throws Exception { + TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName = + TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder() + .setB(TestSameOuterClassNameOuterClass.FooBar.BAR) + .build(); + + RowData row = + ProtobufTestHelper.pbBytesToRow( + TestSameOuterClassNameOuterClass.TestSameOuterClassName.class, + testSameOuterClassName.toByteArray(), + true); + assertEquals(1, row.getInt(1)); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java new file mode 100644 index 00000000000..9e9e3f7fe69 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test conversion of flink internal primitive data to same outer class name proto data. */ +public class SameOuterClassNameRowToProtoTest { + @Test + public void testSimple() throws Exception { + RowData row = GenericRowData.of(1, StringData.fromString("BAR")); + + byte[] bytes = + ProtobufTestHelper.rowToPbBytes( + row, TestSameOuterClassNameOuterClass.TestSameOuterClassName.class); + TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName = + TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes); + assertEquals(1, testSameOuterClassName.getA()); + assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, testSameOuterClassName.getB()); + } + + @Test + public void testEnumAsInt() throws Exception { + RowData row = GenericRowData.of(1, 1); + + byte[] bytes = + ProtobufTestHelper.rowToPbBytes( + row, TestSameOuterClassNameOuterClass.TestSameOuterClassName.class, true); + TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName = + TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes); + assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, testSameOuterClassName.getB()); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java index f7cfa331109..2409e60db7e 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java @@ -18,7 +18,8 @@ package org.apache.flink.formats.protobuf; -import org.apache.flink.formats.protobuf.testproto.SimpleTest; +import org.apache.flink.formats.protobuf.testproto.SimpleTestMulti; +import org.apache.flink.formats.protobuf.testproto.Status; import org.apache.flink.table.data.RowData; import com.google.protobuf.ByteString; @@ -33,8 +34,8 @@ import static org.junit.Assert.assertTrue; public class SimpleProtoToRowTest { @Test public void testSimple() throws Exception { - SimpleTest simple = - SimpleTest.newBuilder() + SimpleTestMulti simple = + SimpleTestMulti.newBuilder() .setA(1) .setB(2L) .setC(false) @@ -42,14 +43,15 @@ public class SimpleProtoToRowTest { .setE(0.01) .setF("haha") .setG(ByteString.copyFrom(new byte[] {1})) - .setH(SimpleTest.Corpus.IMAGES) + .setH(SimpleTestMulti.Corpus.IMAGES) + .setI(Status.FINISHED) .setFAbc7D(1) // test fieldNameToJsonName .setVpr6S(2) .build(); - RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray()); + RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray()); - assertEquals(10, row.getArity()); + assertEquals(11, row.getArity()); assertEquals(1, row.getInt(0)); assertEquals(2L, row.getLong(1)); assertFalse(row.getBoolean(2)); @@ -58,14 +60,15 @@ public class SimpleProtoToRowTest { assertEquals("haha", row.getString(5).toString()); assertEquals(1, (row.getBinary(6))[0]); assertEquals("IMAGES", row.getString(7).toString()); - assertEquals(1, row.getInt(8)); - assertEquals(2, row.getInt(9)); + assertEquals("FINISHED", row.getString(8).toString()); + assertEquals(1, row.getInt(9)); + assertEquals(2, row.getInt(10)); } @Test public void testNotExistsValueIgnoringDefault() throws Exception { - SimpleTest simple = - SimpleTest.newBuilder() + SimpleTestMulti simple = + SimpleTestMulti.newBuilder() .setB(2L) .setC(false) .setD(0.1f) @@ -73,7 +76,7 @@ public class SimpleProtoToRowTest { .setF("haha") .build(); - RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray()); + RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray()); assertTrue(row.isNullAt(0)); assertFalse(row.isNullAt(1)); @@ -81,13 +84,13 @@ public class SimpleProtoToRowTest { @Test public void testDefaultValues() throws Exception { - SimpleTest simple = SimpleTest.newBuilder().build(); + SimpleTestMulti simple = SimpleTestMulti.newBuilder().build(); RowData row = ProtobufTestHelper.pbBytesToRow( - SimpleTest.class, + SimpleTestMulti.class, simple.toByteArray(), - new PbFormatConfig(SimpleTest.class.getName(), false, true, ""), + new PbFormatConfig(SimpleTestMulti.class.getName(), false, true, ""), false); assertFalse(row.isNullAt(0)); @@ -98,6 +101,7 @@ public class SimpleProtoToRowTest { assertFalse(row.isNullAt(5)); assertFalse(row.isNullAt(6)); assertFalse(row.isNullAt(7)); + assertFalse(row.isNullAt(8)); assertEquals(10, row.getInt(0)); assertEquals(100L, row.getLong(1)); assertFalse(row.getBoolean(2)); @@ -105,13 +109,20 @@ public class SimpleProtoToRowTest { assertEquals(0.0d, row.getDouble(4), 0.0001); assertEquals("f", row.getString(5).toString()); assertArrayEquals(ByteString.EMPTY.toByteArray(), row.getBinary(6)); - assertEquals(SimpleTest.Corpus.UNIVERSAL.toString(), row.getString(7).toString()); + assertEquals(SimpleTestMulti.Corpus.UNIVERSAL.toString(), row.getString(7).toString()); + assertEquals(Status.UNSPECIFIED.toString(), row.getString(8).toString()); } @Test public void testIntEnum() throws Exception { - SimpleTest simple = SimpleTest.newBuilder().setH(SimpleTest.Corpus.IMAGES).build(); - RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTest.class, simple.toByteArray(), true); + SimpleTestMulti simple = + SimpleTestMulti.newBuilder() + .setH(SimpleTestMulti.Corpus.IMAGES) + .setI(Status.STARTED) + .build(); + RowData row = + ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, simple.toByteArray(), true); assertEquals(2, row.getInt(7)); + assertEquals(1, row.getInt(8)); } } diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java index eccee930892..04b059b8bcc 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java @@ -18,7 +18,8 @@ package org.apache.flink.formats.protobuf; -import org.apache.flink.formats.protobuf.testproto.SimpleTest; +import org.apache.flink.formats.protobuf.testproto.SimpleTestMulti; +import org.apache.flink.formats.protobuf.testproto.Status; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -43,21 +44,23 @@ public class SimpleRowToProtoTest { StringData.fromString("hello"), new byte[] {1}, StringData.fromString("IMAGES"), + StringData.fromString("FINISHED"), 1, 2); - byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class); - SimpleTest simpleTest = SimpleTest.parseFrom(bytes); - assertTrue(simpleTest.hasA()); - assertEquals(1, simpleTest.getA()); - assertEquals(2L, simpleTest.getB()); - assertFalse(simpleTest.getC()); - assertEquals(Float.valueOf(0.1f), Float.valueOf(simpleTest.getD())); - assertEquals(Double.valueOf(0.01d), Double.valueOf(simpleTest.getE())); - assertEquals("hello", simpleTest.getF()); - assertEquals(1, simpleTest.getG().byteAt(0)); - assertEquals(SimpleTest.Corpus.IMAGES, simpleTest.getH()); - assertEquals(1, simpleTest.getFAbc7D()); + byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class); + SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes); + assertTrue(simpleTestMulti.hasA()); + assertEquals(1, simpleTestMulti.getA()); + assertEquals(2L, simpleTestMulti.getB()); + assertFalse(simpleTestMulti.getC()); + assertEquals(Float.valueOf(0.1f), Float.valueOf(simpleTestMulti.getD())); + assertEquals(Double.valueOf(0.01d), Double.valueOf(simpleTestMulti.getE())); + assertEquals("hello", simpleTestMulti.getF()); + assertEquals(1, simpleTestMulti.getG().byteAt(0)); + assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH()); + assertEquals(Status.FINISHED, simpleTestMulti.getI()); + assertEquals(1, simpleTestMulti.getFAbc7D()); } @Test @@ -72,14 +75,16 @@ public class SimpleRowToProtoTest { StringData.fromString("hello"), null, null, + null, 1, 2); - byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class); - SimpleTest simpleTest = SimpleTest.parseFrom(bytes); - assertFalse(simpleTest.hasA()); - assertFalse(simpleTest.hasG()); - assertFalse(simpleTest.hasH()); + byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class); + SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes); + assertFalse(simpleTestMulti.hasA()); + assertFalse(simpleTestMulti.hasG()); + assertFalse(simpleTestMulti.hasH()); + assertFalse(simpleTestMulti.hasI()); } @Test @@ -87,10 +92,12 @@ public class SimpleRowToProtoTest { RowData row = GenericRowData.of( null, null, null, null, null, null, null, 2, // CORPUS: IMAGE + 1, // STATUS: STARTED null, null); - byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTest.class, true); - SimpleTest simpleTest = SimpleTest.parseFrom(bytes); - assertEquals(SimpleTest.Corpus.IMAGES, simpleTest.getH()); + byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, SimpleTestMulti.class, true); + SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes); + assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH()); + assertEquals(Status.STARTED, simpleTestMulti.getI()); } } diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java new file mode 100644 index 00000000000..e567a789da1 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti; +import org.apache.flink.table.data.RowData; + +import com.google.protobuf.Timestamp; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test conversion of proto timestamp data with multiple_files options to flink internal data. */ +public class TimestampMultiProtoToRowTest { + + @Test + public void testSimple() throws Exception { + TimestampTestMulti timestampTestMulti = + TimestampTestMulti.newBuilder() + .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123)) + .build(); + RowData row = + ProtobufTestHelper.pbBytesToRow( + TimestampTestMulti.class, timestampTestMulti.toByteArray()); + + RowData rowData = row.getRow(0, 2); + assertEquals(1672498800, rowData.getLong(0)); + assertEquals(123, rowData.getInt(1)); + } +} diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java similarity index 50% copy from flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java copy to flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java index 06370f3fa6e..213f42661a2 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java @@ -18,21 +18,27 @@ package org.apache.flink.formats.protobuf; -/** store config and common information. */ -public class PbFormatContext { - private final String outerPrefix; - private final PbFormatConfig pbFormatConfig; +import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; - public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) { - this.outerPrefix = outerPrefix; - this.pbFormatConfig = pbFormatConfig; - } +import org.junit.Test; - public String getOuterPrefix() { - return outerPrefix; - } +import static org.junit.Assert.assertEquals; + +/** + * Test conversion of flink internal primitive data to proto timestamp data with multiple_files + * options. + */ +public class TimestampMultiRowToProtoTest { + + @Test + public void testSimple() throws Exception { + RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123)); - public PbFormatConfig getPbFormatConfig() { - return pbFormatConfig; + byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, TimestampTestMulti.class); + TimestampTestMulti timestampTestMulti = TimestampTestMulti.parseFrom(bytes); + assertEquals(1672498800, timestampTestMulti.getTs().getSeconds()); + assertEquals(123, timestampTestMulti.getTs().getNanos()); } } diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java new file mode 100644 index 00000000000..55917933bc2 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti; +import org.apache.flink.table.data.RowData; + +import com.google.protobuf.Timestamp; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test conversion of proto timestamp data to flink internal data. */ +public class TimestampNoMultiProtoToRowTest { + + @Test + public void testSimple() throws Exception { + TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti = + TestTimestampNomulti.TimestampTestNoMulti.newBuilder() + .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123)) + .build(); + RowData row = + ProtobufTestHelper.pbBytesToRow( + TestTimestampNomulti.TimestampTestNoMulti.class, + timestampTestNoMulti.toByteArray()); + + RowData rowData = row.getRow(0, 2); + assertEquals(1672498800, rowData.getLong(0)); + assertEquals(123, rowData.getInt(1)); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java new file mode 100644 index 00000000000..65cd877b2bf --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test conversion of flink internal primitive data to proto timestamp data. */ +public class TimestampNoMultiRowToProtoTest { + + @Test + public void testSimple() throws Exception { + RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123)); + + byte[] bytes = + ProtobufTestHelper.rowToPbBytes( + row, TestTimestampNomulti.TimestampTestNoMulti.class); + TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti = + TestTimestampNomulti.TimestampTestNoMulti.parseFrom(bytes); + assertEquals(1672498800, timestampTestNoMulti.getTs().getSeconds()); + assertEquals(123, timestampTestNoMulti.getTs().getNanos()); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java new file mode 100644 index 00000000000..935c17c169b --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti; +import org.apache.flink.table.data.RowData; + +import com.google.protobuf.Timestamp; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test conversion of proto timestamp data with multiple_files and outer_classname options to flink + * internal data. + */ +public class TimestampOuterMultiProtoToRowTest { + + @Test + public void testSimple() throws Exception { + TimestampTestOuterMulti timestampTestOuterMulti = + TimestampTestOuterMulti.newBuilder() + .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123)) + .build(); + RowData row = + ProtobufTestHelper.pbBytesToRow( + TimestampTestOuterMulti.class, timestampTestOuterMulti.toByteArray()); + + RowData rowData = row.getRow(0, 2); + assertEquals(1672498800, rowData.getLong(0)); + assertEquals(123, rowData.getInt(1)); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java new file mode 100644 index 00000000000..1f27ed60f0f --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test conversion of flink internal primitive data to proto timestamp data with multiple_files and + * outer_classname options. + */ +public class TimestampOuterMultiRowToProtoTest { + + @Test + public void testSimple() throws Exception { + RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123)); + + byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, TimestampTestOuterMulti.class); + TimestampTestOuterMulti timestampTestOuterMulti = TimestampTestOuterMulti.parseFrom(bytes); + assertEquals(1672498800, timestampTestOuterMulti.getTs().getSeconds()); + assertEquals(123, timestampTestOuterMulti.getTs().getNanos()); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java new file mode 100644 index 00000000000..5c2c08fe435 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto; +import org.apache.flink.table.data.RowData; + +import com.google.protobuf.Timestamp; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test conversion of proto timestamp data with outer_classname options to flink internal data. */ +public class TimestampOuterNoMultiProtoToRowTest { + + @Test + public void testSimple() throws Exception { + TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti timestampTestOuterNoMulti = + TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.newBuilder() + .setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123)) + .build(); + RowData row = + ProtobufTestHelper.pbBytesToRow( + TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class, + timestampTestOuterNoMulti.toByteArray()); + + RowData rowData = row.getRow(0, 2); + assertEquals(1672498800, rowData.getLong(0)); + assertEquals(123, rowData.getInt(1)); + } +} diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java new file mode 100644 index 00000000000..208b49ab780 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test conversion of flink internal primitive data to proto timestamp data with outer_classname + * options. + */ +public class TimestampOuterNoMultiRowToProtoTest { + + @Test + public void testSimple() throws Exception { + RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123)); + + byte[] bytes = + ProtobufTestHelper.rowToPbBytes( + row, TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class); + TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti timestampTestOuterNoMulti = + TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.parseFrom(bytes); + assertEquals(1672498800, timestampTestOuterNoMulti.getTs().getSeconds()); + assertEquals(123, timestampTestOuterNoMulti.getTs().getNanos()); + } +} diff --git a/flink-formats/flink-protobuf/src/test/proto/test_map.proto b/flink-formats/flink-protobuf/src/test/proto/test_map.proto index e22f771bfe3..cce7daa1143 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_map.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_map.proto @@ -22,16 +22,14 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message MapTest { - optional int32 a = 1; - map<string, string> map1 = 2; - map<string, InnerMessageTest> map2 = 3; - map<string, bytes> map3 = 4; + optional int32 a = 1; + map<string, string> map1 = 2; + map<string, InnerMessageTest> map2 = 3; + map<string, bytes> map3 = 4; - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } + message InnerMessageTest{ + optional int32 a = 1; + optional int64 b = 2; + } } - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto index ba0c320d35f..0829f26132b 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_multiple_level_message.proto @@ -22,21 +22,17 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message MultipleLevelMessageTest { - optional int32 a = 1; - optional int64 b = 2; - optional bool c = 3; - optional InnerMessageTest1 d = 4; + optional int32 a = 1; + optional int64 b = 2; + optional bool c = 3; + optional InnerMessageTest1 d = 4; - message InnerMessageTest1{ - optional InnerMessageTest2 a = 1; - optional bool c = 2; - message InnerMessageTest2{ - optional int32 a =1; - optional int64 b =2; - } + message InnerMessageTest1{ + optional InnerMessageTest2 a = 1; + optional bool c = 2; + message InnerMessageTest2{ + optional int32 a = 1; + optional int64 b = 2; } + } } - - - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_null.proto b/flink-formats/flink-protobuf/src/test/proto/test_null.proto index b17b15336b4..cd3d16d962c 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_null.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_null.proto @@ -22,40 +22,38 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message NullTest { - map<string, string> string_map = 1; - map<int32, int32> int_map = 2; - map<int64, int64> long_map = 3; - map<bool, bool> boolean_map = 4; - map<string, float> float_map = 5; - map<string, double> double_map = 6; - map<string, Corpus> enum_map = 7; - map<string, InnerMessageTest> message_map = 8; - map<string, bytes> bytes_map=9; + map<string, string> string_map = 1; + map<int32, int32> int_map = 2; + map<int64, int64> long_map = 3; + map<bool, bool> boolean_map = 4; + map<string, float> float_map = 5; + map<string, double> double_map = 6; + map<string, Corpus> enum_map = 7; + map<string, InnerMessageTest> message_map = 8; + map<string, bytes> bytes_map = 9; - repeated string string_array = 10; - repeated int32 int_array = 11; - repeated int64 long_array = 12; - repeated bool boolean_array = 13; - repeated float float_array = 14; - repeated double double_array = 15; - repeated Corpus enum_array = 16; - repeated InnerMessageTest message_array = 17; - repeated bytes bytes_array = 18; + repeated string string_array = 10; + repeated int32 int_array = 11; + repeated int64 long_array = 12; + repeated bool boolean_array = 13; + repeated float float_array = 14; + repeated double double_array = 15; + repeated Corpus enum_array = 16; + repeated InnerMessageTest message_array = 17; + repeated bytes bytes_array = 18; - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } + message InnerMessageTest{ + optional int32 a = 1; + optional int64 b = 2; + } - enum Corpus { - UNIVERSAL = 0; - WEB = 1; - IMAGES = 2; - LOCAL = 3; - NEWS = 4; - PRODUCTS = 5; - VIDEO = 7; - } + enum Corpus { + UNIVERSAL = 0; + WEB = 1; + IMAGES = 2; + LOCAL = 3; + NEWS = 4; + PRODUCTS = 5; + VIDEO = 7; + } } - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto index 814761b2cec..b81cf39a31b 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto @@ -22,8 +22,8 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message OneofTest { - oneof test_oneof{ - int32 a = 1; - int32 b = 2; - } + oneof test_oneof{ + int32 a = 1; + int32 b = 2; + } } diff --git a/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto b/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto index b21ca840f65..bbc5f4df4b1 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_pb3.proto @@ -22,32 +22,30 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message Pb3Test { + int32 a = 1; + int64 b = 2; + string c = 3; + float d = 4; + double e = 5; + Corpus f = 6; + InnerMessageTest g = 7; + repeated InnerMessageTest h = 8; + bytes i = 9; + map<string, string> map1 = 10; + map<string, InnerMessageTest> map2 = 11; + + message InnerMessageTest{ int32 a = 1; int64 b = 2; - string c = 3; - float d = 4; - double e = 5; - Corpus f = 6; - InnerMessageTest g = 7; - repeated InnerMessageTest h = 8; - bytes i = 9; - map<string, string> map1 = 10; - map<string, InnerMessageTest> map2 = 11; - - message InnerMessageTest{ - int32 a =1; - int64 b =2; - } + } - enum Corpus { - UNIVERSAL = 0; - WEB = 1; - IMAGES = 2; - LOCAL = 3; - NEWS = 4; - PRODUCTS = 5; - VIDEO = 7; - } + enum Corpus { + UNIVERSAL = 0; + WEB = 1; + IMAGES = 2; + LOCAL = 3; + NEWS = 4; + PRODUCTS = 5; + VIDEO = 7; + } } - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto b/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto index 90cb9a58bed..75dff2ef78b 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_repeated.proto @@ -22,11 +22,10 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message RepeatedTest { - optional int32 a = 1; - repeated int64 b = 2; - optional bool c = 3; - optional float d = 4; - optional double e = 5; - optional string f = 6; + optional int32 a = 1; + repeated int64 b = 2; + optional bool c = 3; + optional float d = 4; + optional double e = 5; + optional string f = 6; } - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto index 2a73a9a111b..4e0e9694521 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto @@ -22,11 +22,9 @@ option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; message RepeatedMessageTest { - repeated InnerMessageTest d = 4; - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } + repeated InnerMessageTest d = 4; + message InnerMessageTest{ + optional int32 a = 1; + optional int64 b = 2; + } } - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto similarity index 86% copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto copy to flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto index 814761b2cec..2941b6b65fe 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_same_outer_class_name.proto @@ -16,14 +16,16 @@ * limitations under the License. */ -syntax = "proto2"; +syntax = "proto3"; package org.apache.flink.formats.protobuf.proto; option java_package = "org.apache.flink.formats.protobuf.testproto"; -option java_multiple_files = true; -message OneofTest { - oneof test_oneof{ - int32 a = 1; - int32 b = 2; - } +message TestSameOuterClassName { + int32 a = 1; + FooBar b = 2; +} + +enum FooBar { + FOO = 0; + BAR = 1; } diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto similarity index 60% rename from flink-formats/flink-protobuf/src/test/proto/test_simple.proto rename to flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto index e5d01f740c7..9ee527e0784 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_simple.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_multi.proto @@ -21,29 +21,34 @@ package org.apache.flink.formats.protobuf.proto; option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; -message SimpleTest { - optional int32 a = 1 [default=10]; - optional int64 b = 2 [default=100]; - optional bool c = 3; - optional float d = 4; - optional double e = 5; - optional string f = 6 [default="f"];; - optional bytes g = 7; - optional Corpus h = 8; - //this is must because protobuf have some field name parse bug if number is after "_". - optional int32 f_abc_7d = 9; - optional int32 vpr6s = 10; - - enum Corpus { - UNIVERSAL = 0; - WEB = 1; - IMAGES = 2; - LOCAL = 3; - NEWS = 4; - PRODUCTS = 5; - VIDEO = 7; - } +message SimpleTestMulti { + optional int32 a = 1 [default = 10]; + optional int64 b = 2 [default = 100]; + optional bool c = 3; + optional float d = 4; + optional double e = 5; + optional string f = 6 [default = "f"]; + optional bytes g = 7; + optional Corpus h = 8; + optional Status i = 9; + //this is must because protobuf have some field name parse bug if number is after "_". + optional int32 f_abc_7d = 10; + optional int32 vpr6s = 11; + enum Corpus { + UNIVERSAL = 0; + WEB = 1; + IMAGES = 2; + LOCAL = 3; + NEWS = 4; + PRODUCTS = 5; + VIDEO = 7; + } } - +enum Status { + UNSPECIFIED = 0; + STARTED = 1; + RUNNING = 2; + FINISHED = 3; +} diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto index 334ec0d68e4..649400e0780 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_no_java_package.proto @@ -21,26 +21,24 @@ package org.apache.flink.formats.protobuf.proto; option java_multiple_files = true; message SimpleTestNoJavaPackage { - optional int32 a = 1 [default=10]; - optional int64 b = 2 [default=100]; - optional bool c = 3; - optional float d = 4; - optional double e = 5; - optional string f = 6 [default="f"]; - optional bytes g = 7; - optional Corpus h = 8; - //this is must because protobuf have some field name parse bug if number is after "_". - optional int32 f_abc_7d = 9; + optional int32 a = 1 [default = 10]; + optional int64 b = 2 [default = 100]; + optional bool c = 3; + optional float d = 4; + optional double e = 5; + optional string f = 6 [default = "f"]; + optional bytes g = 7; + optional Corpus h = 8; + //this is must because protobuf have some field name parse bug if number is after "_". + optional int32 f_abc_7d = 9; - enum Corpus { - UNIVERSAL = 0; - WEB = 1; - IMAGES = 2; - LOCAL = 3; - NEWS = 4; - PRODUCTS = 5; - VIDEO = 7; - } + enum Corpus { + UNIVERSAL = 0; + WEB = 1; + IMAGES = 2; + LOCAL = 3; + NEWS = 4; + PRODUCTS = 5; + VIDEO = 7; + } } - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto similarity index 73% rename from flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto rename to flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto index 379732d5503..f5ab90002b1 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_simple_noouter_nomulti.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_nomulti.proto @@ -19,20 +19,26 @@ syntax = "proto2"; package org.apache.flink.formats.protobuf.proto; option java_package = "org.apache.flink.formats.protobuf.testproto"; +option java_multiple_files = false; -message SimpleTestNoouterNomulti { +message SimpleTestNoMulti { + optional int32 a = 1; + optional int64 b = 2; + optional bool c = 3; + optional float d = 4; + optional double e = 5; + optional string f = 6; + optional bytes g = 7; + optional InnerMessageTest h = 8; + optional Result i = 9; + + message InnerMessageTest{ optional int32 a = 1; optional int64 b = 2; - optional bool c = 3; - optional float d = 4; - optional double e = 5; - optional string f = 6; - optional bytes g = 7; - optional InnerMessageTest h = 8; - - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } + } } +enum Result { + SUCCESS = 0; + FAIL = 1; +} diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto index ca024b862d6..b47274a4ff2 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_multi.proto @@ -23,18 +23,17 @@ option java_outer_classname = "SimpleTestOuterMultiProto"; option java_multiple_files = true; message SimpleTestOuterMulti { + optional int32 a = 1; + optional int64 b = 2; + optional bool c = 3; + optional float d = 4; + optional double e = 5; + optional string f = 6; + optional bytes g = 7; + optional InnerMessageTest h = 8; + + message InnerMessageTest{ optional int32 a = 1; optional int64 b = 2; - optional bool c = 3; - optional float d = 4; - optional double e = 5; - optional string f = 6; - optional bytes g = 7; - optional InnerMessageTest h = 8; - - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } + } } - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto index 16459d27250..3185fcaa087 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_simple_outer_nomulti.proto @@ -20,20 +20,20 @@ syntax = "proto2"; package org.apache.flink.formats.protobuf.proto; option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_outer_classname = "SimpleTestOuterNomultiProto"; +option java_multiple_files = false; -message SimpleTestOuterNomulti { +message SimpleTestOuterNoMulti { + optional int32 a = 1; + optional int64 b = 2; + optional bool c = 3; + optional float d = 4; + optional double e = 5; + optional string f = 6; + optional bytes g = 7; + optional InnerMessageTest h = 8; + + message InnerMessageTest{ optional int32 a = 1; optional int64 b = 2; - optional bool c = 3; - optional float d = 4; - optional double e = 5; - optional string f = 6; - optional bytes g = 7; - optional InnerMessageTest h = 8; - - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } + } } - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto similarity index 88% copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto index 814761b2cec..f7da5c47e62 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_multi.proto @@ -16,14 +16,12 @@ * limitations under the License. */ -syntax = "proto2"; +syntax = "proto3"; package org.apache.flink.formats.protobuf.proto; +import "google/protobuf/timestamp.proto"; option java_package = "org.apache.flink.formats.protobuf.testproto"; option java_multiple_files = true; -message OneofTest { - oneof test_oneof{ - int32 a = 1; - int32 b = 2; - } +message TimestampTestMulti { + google.protobuf.Timestamp ts = 1; } diff --git a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto similarity index 84% copy from flink-formats/flink-protobuf/src/test/proto/test_oneof.proto copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto index 814761b2cec..6db2e9dbe15 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_oneof.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_nomulti.proto @@ -16,14 +16,12 @@ * limitations under the License. */ -syntax = "proto2"; +syntax = "proto3"; package org.apache.flink.formats.protobuf.proto; +import "google/protobuf/timestamp.proto"; option java_package = "org.apache.flink.formats.protobuf.testproto"; -option java_multiple_files = true; +option java_multiple_files = false; -message OneofTest { - oneof test_oneof{ - int32 a = 1; - int32 b = 2; - } +message TimestampTestNoMulti { + google.protobuf.Timestamp ts = 1; } diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto similarity index 83% copy from flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto index 2a73a9a111b..bc21d85f378 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_multi.proto @@ -16,17 +16,13 @@ * limitations under the License. */ -syntax = "proto2"; +syntax = "proto3"; package org.apache.flink.formats.protobuf.proto; +import "google/protobuf/timestamp.proto"; option java_package = "org.apache.flink.formats.protobuf.testproto"; +option java_outer_classname = "TimestampTestOuterMultiProto"; option java_multiple_files = true; -message RepeatedMessageTest { - repeated InnerMessageTest d = 4; - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } +message TimestampTestOuterMulti { + google.protobuf.Timestamp ts = 1; } - - diff --git a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto similarity index 79% copy from flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto copy to flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto index 2a73a9a111b..5b571e78a12 100644 --- a/flink-formats/flink-protobuf/src/test/proto/test_repeated_message.proto +++ b/flink-formats/flink-protobuf/src/test/proto/test_timestamp_outer_nomulti.proto @@ -16,17 +16,13 @@ * limitations under the License. */ -syntax = "proto2"; +syntax = "proto3"; package org.apache.flink.formats.protobuf.proto; +import "google/protobuf/timestamp.proto"; option java_package = "org.apache.flink.formats.protobuf.testproto"; -option java_multiple_files = true; +option java_outer_classname = "TimestampTestOuterNomultiProto"; +option java_multiple_files = false; -message RepeatedMessageTest { - repeated InnerMessageTest d = 4; - message InnerMessageTest{ - optional int32 a =1; - optional int64 b =2; - } +message TimestampTestOuterNoMulti { + google.protobuf.Timestamp ts = 1; } - -