This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94afcb9e0ed [FLINK-38547] [flink-protobuf] Upgrade protobuf to 4.32.1 
(#27137)
94afcb9e0ed is described below

commit 94afcb9e0ed30f5998aba2dc1a8e2ab34814eb55
Author: Khaled Hammouda <[email protected]>
AuthorDate: Thu Nov 6 10:29:39 2025 -0500

    [FLINK-38547] [flink-protobuf] Upgrade protobuf to 4.32.1 (#27137)
---
 .../docs/connectors/table/formats/protobuf.md      |  42 +-
 docs/content/release-notes/flink-2.1.md            |   2 +-
 .../parquet/protobuf/ParquetProtoWriters.java      |   4 +-
 .../parquet/protobuf/PatchedProtoWriteSupport.java | 881 +++++++++++++++++++++
 .../protobuf/PatchedProtoWriteSupportTest.java     | 267 +++++++
 .../src/test/resources/protobuf/test_proto2.proto  |   9 +
 .../protobuf/deserialize/ProtoToRowConverter.java  |   3 +-
 .../src/main/resources/META-INF/NOTICE             |   2 +-
 .../python/beam/BeamTablePythonFunctionRunner.java |  10 +-
 flink-python/src/main/resources/META-INF/NOTICE    |   2 +-
 pom.xml                                            |   2 +-
 11 files changed, 1206 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/protobuf.md 
b/docs/content/docs/connectors/table/formats/protobuf.md
index 72d4aae3c3d..33e6d3fb41f 100644
--- a/docs/content/docs/connectors/table/formats/protobuf.md
+++ b/docs/content/docs/connectors/table/formats/protobuf.md
@@ -151,10 +151,8 @@ Format Options
       <td>
           If this value is set to true, the format will read empty values as 
the default values defined in the proto file.
           If the value is set to false, the format will generate null values 
if the data element does not exist in the binary protobuf message.
-          If proto syntax is proto3, users need to set this to true when using 
protobuf versions lower than 3.15 as older versions do not support 
-          checking for field presence which can cause runtime compilation 
issues. Additionally, primtive types will be set to default values 
-          instead of null as field presence cannot be checked for them. Please 
be aware that setting this to true will cause the deserialization 
-          performance to be much slower depending on schema complexity and 
message size.
+          With Flink's current protobuf version (4.32.1), field presence is 
properly supported for proto3, allowing null handling for non-primitive types.
+          Please be aware that setting this to true will cause the 
deserialization performance to be much slower depending on schema complexity 
and message size.
       </td>
     </tr>
     <tr>
@@ -291,4 +289,38 @@ OneOf field
 In the serialization process, there's no guarantee that the Flink fields of 
the same one-of group only contain at most one valid value.
 When serializing, each field is set in the order of Flink schema, so the field 
in the higher position will override the field in lower position in the same 
one-of group.
 
-You can refer to [Language Guide 
(proto2)](https://developers.google.com/protocol-buffers/docs/proto) or 
[Language Guide 
(proto3)](https://developers.google.com/protocol-buffers/docs/proto3) for more 
information about Protobuf types.
+Supported Protobuf Versions
+------------
+
+Flink uses protobuf-java 4.32.1 (corresponding to Protocol Buffers version 
32), which includes support for:
+
+- **Proto2 and Proto3 syntax**: Traditional `syntax = "proto2"` and `syntax = 
"proto3"` definitions
+- **Protobuf Editions**: The new `edition = "2023"` and `edition = "2024"` 
syntax introduced in Protocol Buffers v27+
+- **Improved proto3 field presence detection**: Better handling of optional 
fields without the limitations of older protobuf versions
+
+### Using Protobuf Editions
+
+Protobuf Editions provide a unified syntax that combines proto2 and proto3 
functionality. If you're using Editions in your `.proto` files, Flink fully 
supports them:
+
+```
+edition = "2023";
+package com.example;
+option java_package = "com.example";
+option java_multiple_files = true;
+
+message SimpleTest {
+    int64 uid = 1;
+    string name = 2 [features.field_presence = EXPLICIT];
+    // ... rest of your message definition
+}
+```
+
+Editions allow fine-grained control over feature behavior at the file, 
message, or field level, while maintaining backward compatibility with proto2 
and proto3. For more information, see the [Protobuf Editions 
documentation](https://protobuf.dev/editions/overview/).
+
+Additional Resources
+----------------
+For more information about Protocol Buffers, refer to:
+- [Language Guide 
(proto2)](https://developers.google.com/protocol-buffers/docs/proto)
+- [Language Guide 
(proto3)](https://developers.google.com/protocol-buffers/docs/proto3)
+- [Language Guide 
(Editions)](https://protobuf.dev/programming-guides/editions/) - for the new 
Editions syntax
+- [Protobuf Editions Overview](https://protobuf.dev/editions/overview/) - 
understand the motivation and benefits of Editions
diff --git a/docs/content/release-notes/flink-2.1.md 
b/docs/content/release-notes/flink-2.1.md
index b5e1b4f2f80..62dcce5bf44 100644
--- a/docs/content/release-notes/flink-2.1.md
+++ b/docs/content/release-notes/flink-2.1.md
@@ -182,4 +182,4 @@ Bump flink-shaded version to 20.0 to support Smile format.
 ##### [FLINK-37760](https://issues.apache.org/jira/browse/FLINK-37760)
 
 Bump parquet version to 1.15.3 to resolve parquet-avro module
-vulnerability found in 
[CVE-2025-30065](https://nvd.nist.gov/vuln/detail/CVE-2025-30065).
\ No newline at end of file
+vulnerability found in 
[CVE-2025-30065](https://nvd.nist.gov/vuln/detail/CVE-2025-30065).
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
index 698b32b14ea..4138cdda44a 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.OutputFile;
-import org.apache.parquet.proto.ProtoWriteSupport;
 
 /** Convenience builder for creating {@link ParquetWriterFactory} instances 
for Protobuf classes. */
 public class ParquetProtoWriters {
@@ -62,7 +61,8 @@ public class ParquetProtoWriters {
 
         @Override
         protected WriteSupport<T> getWriteSupport(Configuration conf) {
-            return new ProtoWriteSupport<>(clazz);
+            // Use patched implementation compatible with protobuf 4.x
+            return new PatchedProtoWriteSupport<>(clazz);
         }
     }
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java
new file mode 100644
index 00000000000..dbd8ec1d756
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java
@@ -0,0 +1,881 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.protobuf;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import com.google.protobuf.util.Timestamps;
+import com.google.type.Date;
+import com.google.type.TimeOfDay;
+import com.twitter.elephantbird.util.Protobufs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.proto.ProtoReadSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.IncompatibleSchemaModificationException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Optional.ofNullable;
+import static 
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
+import static 
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
+
+/**
+ * Implementation of {@link WriteSupport} for writing Protocol Buffers.
+ *
+ * <p>NOTE: This is a vendored patched version of ProtoWriteSupport to work 
with protobuf 4.x. The
+ * patch replaces the deprecated/removed enum based syntax detection with a 
string based approach
+ * compatible with protobuf 3 and 4.See parquet-java issue
+ * https://github.com/apache/parquet-java/issues/3175.
+ *
+ * <p>The original source can be found here:
+ * 
https://github.com/apache/parquet-java/blob/apache-parquet-1.15.2/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+ *
+ * <p>Patched code is marked with BEGIN PATCH / END PATCH comments in the 
source.
+ */
+class PatchedProtoWriteSupport<T extends MessageOrBuilder> extends 
WriteSupport<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PatchedProtoWriteSupport.class);
+    public static final String PB_CLASS_WRITE = "parquet.proto.writeClass";
+    // PARQUET-968 introduces changes to allow writing specs compliant schemas 
with
+    // parquet-protobuf.
+    // In the past, collection were not written using the LIST and MAP 
wrappers and thus were not
+    // compliant
+    // with the parquet specs. This flag, is set to true, allows to write 
using spec compliant
+    // schemas
+    // but is set to false by default to keep backward compatibility.
+    public static final String PB_SPECS_COMPLIANT_WRITE = 
"parquet.proto.writeSpecsCompliant";
+
+    public static final String PB_UNWRAP_PROTO_WRAPPERS = 
"parquet.proto.unwrapProtoWrappers";
+
+    private boolean writeSpecsCompliant = false;
+    private boolean unwrapProtoWrappers = false;
+    private RecordConsumer recordConsumer;
+    private Class<? extends Message> protoMessage;
+    private Descriptor descriptor;
+    private MessageWriter messageWriter;
+    // Keep protobuf enum value with number in the metadata, so that in read 
time, a reader can read
+    // at least
+    // the number back even with an outdated schema which might not contain 
all enum values.
+    private Map<String, Map<String, Integer>> protoEnumBookKeeper = new 
HashMap<>();
+
+    public PatchedProtoWriteSupport() {}
+
+    public PatchedProtoWriteSupport(Class<? extends Message> protobufClass) {
+        this.protoMessage = protobufClass;
+    }
+
+    public PatchedProtoWriteSupport(Descriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    @Override
+    public String getName() {
+        return "protobuf";
+    }
+
+    public static void setSchema(Configuration configuration, Class<? extends 
Message> protoClass) {
+        configuration.setClass(PB_CLASS_WRITE, protoClass, Message.class);
+    }
+
+    /**
+     * Make parquet-protobuf use the LIST and MAP wrappers for collections. 
Set to false if you need
+     * backward compatibility with parquet before PARQUET-968 (1.9.0 and 
older).
+     *
+     * @param configuration The hadoop configuration
+     * @param writeSpecsCompliant If set to true, the old schema style will be 
used (without
+     *     wrappers).
+     */
+    public static void setWriteSpecsCompliant(
+            Configuration configuration, boolean writeSpecsCompliant) {
+        configuration.setBoolean(PB_SPECS_COMPLIANT_WRITE, 
writeSpecsCompliant);
+    }
+
+    public static void setUnwrapProtoWrappers(
+            Configuration configuration, boolean unwrapProtoWrappers) {
+        configuration.setBoolean(PB_UNWRAP_PROTO_WRAPPERS, 
unwrapProtoWrappers);
+    }
+
+    /**
+     * Writes Protocol buffer to parquet file.
+     *
+     * @param record instance of Message.Builder or Message.
+     */
+    @Override
+    public void write(T record) {
+        recordConsumer.startMessage();
+        try {
+            messageWriter.writeTopLevelMessage(record);
+        } catch (RuntimeException e) {
+            Message m =
+                    (record instanceof Message.Builder)
+                            ? ((Message.Builder) record).build()
+                            : (Message) record;
+            LOG.error("Cannot write message {}: {}", e.getMessage(), m);
+            throw e;
+        }
+        recordConsumer.endMessage();
+    }
+
+    @Override
+    public void prepareForWrite(RecordConsumer recordConsumer) {
+        this.recordConsumer = recordConsumer;
+    }
+
+    @Override
+    public WriteContext init(Configuration configuration) {
+        return init(new HadoopParquetConfiguration(configuration));
+    }
+
+    @Override
+    public WriteContext init(ParquetConfiguration configuration) {
+
+        Map<String, String> extraMetaData = new HashMap<>();
+
+        // if no protobuf descriptor was given in constructor, load descriptor 
from configuration
+        // (set with
+        // setProtobufClass)
+        if (descriptor == null) {
+            if (protoMessage == null) {
+                Class<? extends Message> pbClass =
+                        configuration.getClass(PB_CLASS_WRITE, null, 
Message.class);
+                if (pbClass != null) {
+                    protoMessage = pbClass;
+                } else {
+                    String msg = "Protocol buffer class or descriptor not 
specified.";
+                    String hint =
+                            " Please use method 
ProtoParquetOutputFormat.setProtobufClass(...) or other similar method.";
+                    throw new BadConfigurationException(msg + hint);
+                }
+            }
+            descriptor = Protobufs.getMessageDescriptor(protoMessage);
+            extraMetaData.put(ProtoReadSupport.PB_CLASS, 
protoMessage.getName());
+        }
+
+        unwrapProtoWrappers =
+                configuration.getBoolean(PB_UNWRAP_PROTO_WRAPPERS, 
unwrapProtoWrappers);
+        writeSpecsCompliant =
+                configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, 
writeSpecsCompliant);
+        MessageType rootSchema = new 
PatchedProtoSchemaConverter(configuration).convert(descriptor);
+        validatedMapping(descriptor, rootSchema);
+
+        this.messageWriter = new MessageWriter(descriptor, rootSchema);
+
+        extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, 
descriptor.toProto().toString());
+        extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, 
String.valueOf(writeSpecsCompliant));
+        extraMetaData.put(PB_UNWRAP_PROTO_WRAPPERS, 
String.valueOf(unwrapProtoWrappers));
+        return new WriteContext(rootSchema, extraMetaData);
+    }
+
+    @Override
+    public FinalizedWriteContext finalizeWrite() {
+        Map<String, String> protoMetadata = enumMetadata();
+        return new FinalizedWriteContext(protoMetadata);
+    }
+
+    private Map<String, String> enumMetadata() {
+        Map<String, String> enumMetadata = new HashMap<>();
+        for (Map.Entry<String, Map<String, Integer>> enumNameNumberMapping :
+                protoEnumBookKeeper.entrySet()) {
+            StringBuilder nameNumberPairs = new StringBuilder();
+            if (enumNameNumberMapping.getValue().isEmpty()) {
+                // No enum is ever written to any column of this file, put an 
empty string as the
+                // value in the metadata
+                LOG.info("No enum is written for {}", 
enumNameNumberMapping.getKey());
+            }
+            int idx = 0;
+            for (Map.Entry<String, Integer> nameNumberPair :
+                    enumNameNumberMapping.getValue().entrySet()) {
+                nameNumberPairs
+                        .append(nameNumberPair.getKey())
+                        .append(METADATA_ENUM_KEY_VALUE_SEPARATOR)
+                        .append(nameNumberPair.getValue());
+                idx++;
+                if (idx < enumNameNumberMapping.getValue().size()) {
+                    nameNumberPairs.append(METADATA_ENUM_ITEM_SEPARATOR);
+                }
+            }
+            enumMetadata.put(
+                    METADATA_ENUM_PREFIX + enumNameNumberMapping.getKey(),
+                    nameNumberPairs.toString());
+        }
+        return enumMetadata;
+    }
+
+    class FieldWriter {
+        String fieldName;
+        int index = -1;
+
+        void setFieldName(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        /** Sets index of field inside parquet message. */
+        void setIndex(int index) {
+            this.index = index;
+        }
+
+        /** Used for writing repeated fields. */
+        void writeRawValue(Object value) {}
+
+        /** Used for writing nonrepeated (optional, required) fields. */
+        void writeField(Object value) {
+            if (!(this instanceof PatchedProtoWriteSupport.MapWriter)) {
+                recordConsumer.startField(fieldName, index);
+            }
+            writeRawValue(value);
+            if (!(this instanceof PatchedProtoWriteSupport.MapWriter)) {
+                recordConsumer.endField(fieldName, index);
+            }
+        }
+    }
+
+    class MessageWriter extends FieldWriter {
+
+        final FieldWriter[] fieldWriters;
+
+        @SuppressWarnings("unchecked")
+        MessageWriter(Descriptor descriptor, GroupType schema) {
+            List<FieldDescriptor> fields = descriptor.getFields();
+            fieldWriters = (FieldWriter[]) 
Array.newInstance(FieldWriter.class, fields.size());
+
+            for (FieldDescriptor fieldDescriptor : fields) {
+                String name = fieldDescriptor.getName();
+                Type type = schema.getType(name);
+                FieldWriter writer = createWriter(fieldDescriptor, type);
+
+                if (writeSpecsCompliant
+                        && fieldDescriptor.isRepeated()
+                        && !fieldDescriptor.isMapField()) {
+                    writer = new ArrayWriter(writer);
+                } else if (!writeSpecsCompliant && 
fieldDescriptor.isRepeated()) {
+                    // the old schemas style used to write maps as repeated 
fields instead of
+                    // wrapping them in a LIST
+                    writer = new RepeatedWriter(writer);
+                }
+
+                writer.setFieldName(name);
+                writer.setIndex(schema.getFieldIndex(name));
+
+                fieldWriters[fieldDescriptor.getIndex()] = writer;
+            }
+        }
+
+        private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type 
type) {
+
+            switch (fieldDescriptor.getJavaType()) {
+                case STRING:
+                    return new StringWriter();
+                case MESSAGE:
+                    return createMessageWriter(fieldDescriptor, type);
+                case INT:
+                    return new IntWriter();
+                case LONG:
+                    return new LongWriter();
+                case FLOAT:
+                    return new FloatWriter();
+                case DOUBLE:
+                    return new DoubleWriter();
+                case ENUM:
+                    return new EnumWriter(fieldDescriptor.getEnumType());
+                case BOOLEAN:
+                    return new BooleanWriter();
+                case BYTE_STRING:
+                    return new BinaryWriter();
+            }
+
+            return unknownType(fieldDescriptor); // should not be executed, 
always throws exception.
+        }
+
+        private FieldWriter createMessageWriter(FieldDescriptor 
fieldDescriptor, Type type) {
+            if (fieldDescriptor.isMapField() && writeSpecsCompliant) {
+                return createMapWriter(fieldDescriptor, type);
+            }
+
+            if (unwrapProtoWrappers) {
+                Descriptor messageType = fieldDescriptor.getMessageType();
+                if (messageType.equals(Timestamp.getDescriptor())) {
+                    return new TimestampWriter();
+                }
+                if (messageType.equals(Date.getDescriptor())) {
+                    return new DateWriter();
+                }
+                if (messageType.equals(TimeOfDay.getDescriptor())) {
+                    return new TimeWriter();
+                }
+                if (messageType.equals(DoubleValue.getDescriptor())) {
+                    return new DoubleValueWriter();
+                }
+                if (messageType.equals(FloatValue.getDescriptor())) {
+                    return new FloatValueWriter();
+                }
+                if (messageType.equals(Int64Value.getDescriptor())) {
+                    return new Int64ValueWriter();
+                }
+                if (messageType.equals(UInt64Value.getDescriptor())) {
+                    return new UInt64ValueWriter();
+                }
+                if (messageType.equals(Int32Value.getDescriptor())) {
+                    return new Int32ValueWriter();
+                }
+                if (messageType.equals(UInt32Value.getDescriptor())) {
+                    return new UInt32ValueWriter();
+                }
+                if (messageType.equals(BoolValue.getDescriptor())) {
+                    return new BoolValueWriter();
+                }
+                if (messageType.equals(StringValue.getDescriptor())) {
+                    return new StringValueWriter();
+                }
+                if (messageType.equals(BytesValue.getDescriptor())) {
+                    return new BytesValueWriter();
+                }
+            }
+
+            // This can happen now that recursive schemas get truncated to 
bytes.  Write the bytes.
+            if (type.isPrimitive()
+                    && type.asPrimitiveType().getPrimitiveTypeName()
+                            == PrimitiveType.PrimitiveTypeName.BINARY) {
+                return new BinaryWriter();
+            }
+
+            return new MessageWriter(fieldDescriptor.getMessageType(), 
getGroupType(type));
+        }
+
+        private GroupType getGroupType(Type type) {
+            LogicalTypeAnnotation logicalTypeAnnotation = 
type.getLogicalTypeAnnotation();
+            if (logicalTypeAnnotation == null) {
+                return type.asGroupType();
+            }
+            return logicalTypeAnnotation
+                    .accept(
+                            new 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<GroupType>() {
+                                @Override
+                                public Optional<GroupType> visit(
+                                        
LogicalTypeAnnotation.ListLogicalTypeAnnotation
+                                                listLogicalType) {
+                                    return ofNullable(
+                                            type.asGroupType()
+                                                    .getType("list")
+                                                    .asGroupType()
+                                                    .getType("element")
+                                                    .asGroupType());
+                                }
+
+                                @Override
+                                public Optional<GroupType> visit(
+                                        
LogicalTypeAnnotation.MapLogicalTypeAnnotation
+                                                mapLogicalType) {
+                                    return ofNullable(
+                                            type.asGroupType()
+                                                    .getType("key_value")
+                                                    .asGroupType()
+                                                    .getType("value")
+                                                    .asGroupType());
+                                }
+                            })
+                    .orElse(type.asGroupType());
+        }
+
+        private MapWriter createMapWriter(FieldDescriptor fieldDescriptor, 
Type type) {
+            List<FieldDescriptor> fields = 
fieldDescriptor.getMessageType().getFields();
+            if (fields.size() != 2) {
+                throw new UnsupportedOperationException(
+                        "Expected two fields for the map (key/value), but got: 
" + fields);
+            }
+
+            // KeyFieldWriter
+            FieldDescriptor keyProtoField = fields.get(0);
+            FieldWriter keyWriter = createWriter(keyProtoField, type);
+            keyWriter.setFieldName(keyProtoField.getName());
+            keyWriter.setIndex(0);
+
+            // ValueFieldWriter
+            FieldDescriptor valueProtoField = fields.get(1);
+            FieldWriter valueWriter = createWriter(valueProtoField, type);
+            valueWriter.setFieldName(valueProtoField.getName());
+            valueWriter.setIndex(1);
+
+            return new MapWriter(keyWriter, valueWriter);
+        }
+
+        /** Writes top level message. It cannot call startGroup() */
+        void writeTopLevelMessage(Object value) {
+            writeAllFields((MessageOrBuilder) value);
+        }
+
+        /** Writes message as part of repeated field. It cannot start field */
+        @Override
+        final void writeRawValue(Object value) {
+            recordConsumer.startGroup();
+            writeAllFields((MessageOrBuilder) value);
+            recordConsumer.endGroup();
+        }
+
+        /** Used for writing nonrepeated (optional, required) fields. */
+        @Override
+        final void writeField(Object value) {
+            recordConsumer.startField(fieldName, index);
+            writeRawValue(value);
+            recordConsumer.endField(fieldName, index);
+        }
+
+        private void writeAllFields(MessageOrBuilder pb) {
+            Descriptor messageDescriptor = pb.getDescriptorForType();
+            // 
============================================================================
+            // BEGIN PATCH: Replace enum-based syntax detection with 
string-based approach
+            // 
============================================================================
+            String syntax = messageDescriptor.getFile().toProto().getSyntax();
+
+            // Check for editions syntax (not supported)
+            if ("editions".equals(syntax)) {
+                throw new UnsupportedOperationException(
+                        "Protocol Buffers editions syntax is not supported");
+            }
+
+            // proto2 uses empty string or "proto2", proto3 uses "proto3"
+            boolean isProto2 = syntax.isEmpty() || "proto2".equals(syntax);
+
+            if (isProto2) {
+                // 
============================================================================
+                // END PATCH
+                // 
============================================================================
+                // Returns changed fields with values. Map is ordered by id.
+                Map<FieldDescriptor, Object> changedPbFields = 
pb.getAllFields();
+
+                for (Map.Entry<FieldDescriptor, Object> entry : 
changedPbFields.entrySet()) {
+                    FieldDescriptor fieldDescriptor = entry.getKey();
+
+                    if (fieldDescriptor.isExtension()) {
+                        // Field index of an extension field might overlap 
with a base field.
+                        throw new UnsupportedOperationException(
+                                "Cannot convert Protobuf message with 
extension field(s)");
+                    }
+
+                    int fieldIndex = fieldDescriptor.getIndex();
+                    fieldWriters[fieldIndex].writeField(entry.getValue());
+                }
+            } else {
+                // proto3
+                List<FieldDescriptor> fieldDescriptors = 
messageDescriptor.getFields();
+                for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
+                    FieldDescriptor.Type type = fieldDescriptor.getType();
+
+                    // For a field in a oneOf that isn't set don't write 
anything
+                    if (fieldDescriptor.getContainingOneof() != null
+                            && !pb.hasField(fieldDescriptor)) {
+                        continue;
+                    }
+
+                    if (!fieldDescriptor.isRepeated()
+                            && FieldDescriptor.Type.MESSAGE.equals(type)
+                            && !pb.hasField(fieldDescriptor)) {
+                        continue;
+                    }
+                    int fieldIndex = fieldDescriptor.getIndex();
+                    FieldWriter fieldWriter = fieldWriters[fieldIndex];
+                    fieldWriter.writeField(pb.getField(fieldDescriptor));
+                }
+            }
+        }
+    }
+
+    class ArrayWriter extends FieldWriter {
+        final FieldWriter fieldWriter;
+
+        ArrayWriter(FieldWriter fieldWriter) {
+            this.fieldWriter = fieldWriter;
+        }
+
+        @Override
+        final void writeRawValue(Object value) {
+            throw new UnsupportedOperationException("Array has no raw value");
+        }
+
+        @Override
+        final void writeField(Object value) {
+            List<?> list = (List<?>) value;
+            if (list.isEmpty()) {
+                return;
+            }
+
+            recordConsumer.startField(fieldName, index);
+            recordConsumer.startGroup();
+
+            recordConsumer.startField("list", 0); // This is the wrapper group 
for the array field
+            for (Object listEntry : list) {
+                recordConsumer.startGroup();
+                recordConsumer.startField("element", 0); // This is the 
mandatory inner field
+
+                fieldWriter.writeRawValue(listEntry);
+
+                recordConsumer.endField("element", 0);
+                recordConsumer.endGroup();
+            }
+            recordConsumer.endField("list", 0);
+
+            recordConsumer.endGroup();
+            recordConsumer.endField(fieldName, index);
+        }
+    }
+
+    /**
+     * The RepeatedWriter is used to write collections (lists and maps) using 
the old style (without
+     * LIST and MAP wrappers).
+     */
+    class RepeatedWriter extends FieldWriter {
+        final FieldWriter fieldWriter;
+
+        RepeatedWriter(FieldWriter fieldWriter) {
+            this.fieldWriter = fieldWriter;
+        }
+
+        @Override
+        final void writeRawValue(Object value) {
+            throw new UnsupportedOperationException("Array has no raw value");
+        }
+
+        @Override
+        final void writeField(Object value) {
+            List<?> list = (List<?>) value;
+            if (list.isEmpty()) {
+                return;
+            }
+
+            recordConsumer.startField(fieldName, index);
+
+            for (Object listEntry : list) {
+                fieldWriter.writeRawValue(listEntry);
+            }
+
+            recordConsumer.endField(fieldName, index);
+        }
+    }
+
+    /** validates mapping between protobuffer fields and parquet fields. */
+    private void validatedMapping(Descriptor descriptor, GroupType 
parquetSchema) {
+        List<FieldDescriptor> allFields = descriptor.getFields();
+
+        for (FieldDescriptor fieldDescriptor : allFields) {
+            String fieldName = fieldDescriptor.getName();
+            int fieldIndex = fieldDescriptor.getIndex();
+            int parquetIndex = parquetSchema.getFieldIndex(fieldName);
+            if (fieldIndex != parquetIndex) {
+                String message =
+                        "FieldIndex mismatch name="
+                                + fieldName
+                                + ": "
+                                + fieldIndex
+                                + " != "
+                                + parquetIndex;
+                throw new IncompatibleSchemaModificationException(message);
+            }
+        }
+    }
+
+    class StringWriter extends FieldWriter {
+        @Override
+        final void writeRawValue(Object value) {
+            Binary binaryString = Binary.fromString((String) value);
+            recordConsumer.addBinary(binaryString);
+        }
+    }
+
+    class IntWriter extends FieldWriter {
+        @Override
+        final void writeRawValue(Object value) {
+            recordConsumer.addInteger((Integer) value);
+        }
+    }
+
+    class LongWriter extends FieldWriter {
+
+        @Override
+        final void writeRawValue(Object value) {
+            recordConsumer.addLong((Long) value);
+        }
+    }
+
+    class MapWriter extends FieldWriter {
+
+        private final FieldWriter keyWriter;
+        private final FieldWriter valueWriter;
+
+        public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
+            super();
+            this.keyWriter = keyWriter;
+            this.valueWriter = valueWriter;
+        }
+
+        @Override
+        final void writeRawValue(Object value) {
+            Collection<Message> collection = (Collection<Message>) value;
+            if (collection.isEmpty()) {
+                return;
+            }
+            recordConsumer.startField(fieldName, index);
+            recordConsumer.startGroup();
+
+            recordConsumer.startField(
+                    "key_value", 0); // This is the wrapper group for the map 
field
+            for (Message msg : collection) {
+                recordConsumer.startGroup();
+
+                final Descriptor descriptorForType = 
msg.getDescriptorForType();
+                final FieldDescriptor keyDesc = 
descriptorForType.findFieldByName("key");
+                final FieldDescriptor valueDesc = 
descriptorForType.findFieldByName("value");
+
+                keyWriter.writeField(msg.getField(keyDesc));
+                valueWriter.writeField(msg.getField(valueDesc));
+
+                recordConsumer.endGroup();
+            }
+
+            recordConsumer.endField("key_value", 0);
+
+            recordConsumer.endGroup();
+            recordConsumer.endField(fieldName, index);
+        }
+    }
+
+    class FloatWriter extends FieldWriter {
+        @Override
+        final void writeRawValue(Object value) {
+            recordConsumer.addFloat((Float) value);
+        }
+    }
+
+    class DoubleWriter extends FieldWriter {
+        @Override
+        final void writeRawValue(Object value) {
+            recordConsumer.addDouble((Double) value);
+        }
+    }
+
+    class EnumWriter extends FieldWriter {
+        Map<String, Integer> enumNameNumberPairs;
+
+        public EnumWriter(Descriptors.EnumDescriptor enumType) {
+            if (protoEnumBookKeeper.containsKey(enumType.getFullName())) {
+                enumNameNumberPairs = 
protoEnumBookKeeper.get(enumType.getFullName());
+            } else {
+                enumNameNumberPairs = new HashMap<>();
+                protoEnumBookKeeper.put(enumType.getFullName(), 
enumNameNumberPairs);
+            }
+        }
+
+        @Override
+        final void writeRawValue(Object value) {
+            Descriptors.EnumValueDescriptor enumValueDesc = 
(Descriptors.EnumValueDescriptor) value;
+            Binary binary = Binary.fromString(enumValueDesc.getName());
+            recordConsumer.addBinary(binary);
+            enumNameNumberPairs.putIfAbsent(enumValueDesc.getName(), 
enumValueDesc.getNumber());
+        }
+    }
+
+    class BooleanWriter extends FieldWriter {
+        @Override
+        final void writeRawValue(Object value) {
+            recordConsumer.addBoolean((Boolean) value);
+        }
+    }
+
+    class BinaryWriter extends FieldWriter {
+        @Override
+        final void writeRawValue(Object value) {
+            // Non-ByteString values can happen when recursions gets truncated.
+            ByteString byteString =
+                    value instanceof ByteString
+                            ? (ByteString) value
+                            // TODO: figure out a way to use MessageOrBuilder
+                            : value instanceof Message
+                                    ? ((Message) value).toByteString()
+                                    // Worst-case, just dump as plain java 
string.
+                                    : 
ByteString.copyFromUtf8(value.toString());
+            Binary binary = 
Binary.fromConstantByteArray(byteString.toByteArray());
+            recordConsumer.addBinary(binary);
+        }
+    }
+
+    class TimestampWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            Timestamp timestamp = (Timestamp) value;
+            recordConsumer.addLong(Timestamps.toNanos(timestamp));
+        }
+    }
+
+    class DateWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            Date date = (Date) value;
+            LocalDate localDate = LocalDate.of(date.getYear(), 
date.getMonth(), date.getDay());
+            recordConsumer.addInteger((int) localDate.toEpochDay());
+        }
+    }
+
+    class TimeWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            com.google.type.TimeOfDay timeOfDay = (com.google.type.TimeOfDay) 
value;
+            LocalTime localTime =
+                    LocalTime.of(
+                            timeOfDay.getHours(),
+                            timeOfDay.getMinutes(),
+                            timeOfDay.getSeconds(),
+                            timeOfDay.getNanos());
+            recordConsumer.addLong(localTime.toNanoOfDay());
+        }
+    }
+
+    class DoubleValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addDouble(((DoubleValue) value).getValue());
+        }
+    }
+
+    class FloatValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addFloat(((FloatValue) value).getValue());
+        }
+    }
+
+    class Int64ValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addLong(((Int64Value) value).getValue());
+        }
+    }
+
+    class UInt64ValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addLong(((UInt64Value) value).getValue());
+        }
+    }
+
+    class Int32ValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addInteger(((Int32Value) value).getValue());
+        }
+    }
+
+    class UInt32ValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addLong(((UInt32Value) value).getValue());
+        }
+    }
+
+    class BoolValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            recordConsumer.addBoolean(((BoolValue) value).getValue());
+        }
+    }
+
+    class StringValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            Binary binaryString = Binary.fromString(((StringValue) 
value).getValue());
+            recordConsumer.addBinary(binaryString);
+        }
+    }
+
+    class BytesValueWriter extends FieldWriter {
+        @Override
+        void writeRawValue(Object value) {
+            byte[] byteArray = ((BytesValue) value).getValue().toByteArray();
+            Binary binary = Binary.fromConstantByteArray(byteArray);
+            recordConsumer.addBinary(binary);
+        }
+    }
+
+    private FieldWriter unknownType(FieldDescriptor fieldDescriptor) {
+        String exceptionMsg =
+                "Unknown type with descriptor \""
+                        + fieldDescriptor
+                        + "\" and type \""
+                        + fieldDescriptor.getJavaType()
+                        + "\".";
+        throw new InvalidRecordException(exceptionMsg);
+    }
+}
+
+/**
+ * Minimal schema converter extracting only needed behavior for the patched 
support. For the test
+ * cases we only rely on primitive field mappings, so we can forward directly 
to the real converter
+ * if present; else implement minimal mapping. To minimize risk, we 
reflectively invoke the original
+ * ProtoSchemaConverter if available.
+ */
+class PatchedProtoSchemaConverter {
+    private final ParquetConfiguration configuration;
+
+    PatchedProtoSchemaConverter(ParquetConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    MessageType convert(Descriptor descriptor) {
+        try {
+            Class<?> clazz = 
Class.forName("org.apache.parquet.proto.ProtoSchemaConverter");
+            Object inst =
+                    
clazz.getConstructor(ParquetConfiguration.class).newInstance(configuration);
+            return (MessageType)
+                    clazz.getMethod("convert", Descriptor.class).invoke(inst, 
descriptor);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to delegate to 
ProtoSchemaConverter", e);
+        }
+    }
+}
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupportTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupportTest.java
new file mode 100644
index 00000000000..77f829285f2
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupportTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.protobuf;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.proto.ProtoParquetReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+
+import static 
org.apache.flink.formats.parquet.protobuf.SimpleRecord.SimpleProtoRecord;
+import static 
org.apache.flink.formats.parquet.protobuf.TestProto2.TestProto2Record;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link PatchedProtoWriteSupport} to verify protobuf 4.x 
compatibility.
+ *
+ * <p>This test validates that the patched string-based syntax detection 
correctly handles both
+ * proto2 and proto3 messages when using protobuf 4.x, where the enum-based 
Syntax API was removed.
+ */
+class PatchedProtoWriteSupportTest {
+
+    @TempDir File tempDir;
+
+    /**
+     * Tests that proto3 messages can be written and read correctly with the 
patched write support.
+     */
+    @Test
+    void testProto3SyntaxDetection() throws IOException {
+        File outputFile = new File(tempDir, "proto3_test.parquet");
+        Path path = new Path(outputFile.toURI());
+
+        // Create a proto3 message
+        SimpleProtoRecord record =
+                SimpleProtoRecord.newBuilder()
+                        .setFoo("test_foo")
+                        .setBar("test_bar")
+                        .setNum(42)
+                        .build();
+
+        // Write using PatchedProtoWriteSupport directly
+        try (ParquetWriter<SimpleProtoRecord> writer =
+                new ParquetWriter<>(
+                        path,
+                        new 
PatchedProtoWriteSupport<>(SimpleProtoRecord.class),
+                        CompressionCodecName.SNAPPY,
+                        ParquetWriter.DEFAULT_BLOCK_SIZE,
+                        ParquetWriter.DEFAULT_PAGE_SIZE)) {
+            writer.write(record);
+        }
+
+        // Read back and verify
+        try (ParquetReader<SimpleProtoRecord.Builder> reader =
+                
ProtoParquetReader.<SimpleProtoRecord.Builder>builder(path).build()) {
+            SimpleProtoRecord.Builder readRecord = reader.read();
+            assertThat(readRecord).isNotNull();
+            assertThat(readRecord.build()).isEqualTo(record);
+        }
+    }
+
+    /**
+     * Tests that proto2 messages can be written and read correctly with the 
patched write support.
+     */
+    @Test
+    void testProto2SyntaxDetection() throws IOException {
+        File outputFile = new File(tempDir, "proto2_test.parquet");
+        Path path = new Path(outputFile.toURI());
+
+        // Create a proto2 message with only some fields set
+        TestProto2Record record =
+                
TestProto2Record.newBuilder().setName("test_name").setValue(123).build();
+
+        // Write using PatchedProtoWriteSupport directly
+        try (ParquetWriter<TestProto2Record> writer =
+                new ParquetWriter<>(
+                        path,
+                        new PatchedProtoWriteSupport<>(TestProto2Record.class),
+                        CompressionCodecName.SNAPPY,
+                        ParquetWriter.DEFAULT_BLOCK_SIZE,
+                        ParquetWriter.DEFAULT_PAGE_SIZE)) {
+            writer.write(record);
+        }
+
+        // Read back and verify
+        try (ParquetReader<TestProto2Record.Builder> reader =
+                
ProtoParquetReader.<TestProto2Record.Builder>builder(path).build()) {
+            TestProto2Record.Builder readRecord = reader.read();
+            assertThat(readRecord).isNotNull();
+            TestProto2Record result = readRecord.build();
+            assertThat(result.getName()).isEqualTo("test_name");
+            assertThat(result.getValue()).isEqualTo(123);
+            // flag field was not set, should be default
+            assertThat(result.hasFlag()).isFalse();
+        }
+    }
+
+    /**
+     * Tests that proto3 messages with default values are handled correctly.
+     *
+     * <p>In proto3, all fields are written including those with default 
values.
+     */
+    @Test
+    void testProto3WithDefaults() throws IOException {
+        File outputFile = new File(tempDir, "proto3_defaults.parquet");
+        Path path = new Path(outputFile.toURI());
+
+        // Create a proto3 message with default values
+        SimpleProtoRecord record =
+                
SimpleProtoRecord.newBuilder().setFoo("").setBar("").setNum(0).build();
+
+        // Write using PatchedProtoWriteSupport
+        try (ParquetWriter<SimpleProtoRecord> writer =
+                new ParquetWriter<>(
+                        path,
+                        new 
PatchedProtoWriteSupport<>(SimpleProtoRecord.class),
+                        CompressionCodecName.SNAPPY,
+                        ParquetWriter.DEFAULT_BLOCK_SIZE,
+                        ParquetWriter.DEFAULT_PAGE_SIZE)) {
+            writer.write(record);
+        }
+
+        // Read back and verify - proto3 should read all fields even if default
+        try (ParquetReader<SimpleProtoRecord.Builder> reader =
+                
ProtoParquetReader.<SimpleProtoRecord.Builder>builder(path).build()) {
+            SimpleProtoRecord.Builder readRecord = reader.read();
+            assertThat(readRecord).isNotNull();
+            assertThat(readRecord.build()).isEqualTo(record);
+        }
+    }
+
+    /**
+     * Tests that proto2 only writes fields that have been explicitly set.
+     *
+     * <p>In proto2, unset optional fields should not be written to the file.
+     */
+    @Test
+    void testProto2OnlyWritesSetFields() throws IOException {
+        File outputFile = new File(tempDir, "proto2_partial.parquet");
+        Path path = new Path(outputFile.toURI());
+
+        // Create a proto2 message with only one field set
+        TestProto2Record record = 
TestProto2Record.newBuilder().setName("only_name").build();
+
+        // Write using PatchedProtoWriteSupport
+        try (ParquetWriter<TestProto2Record> writer =
+                new ParquetWriter<>(
+                        path,
+                        new PatchedProtoWriteSupport<>(TestProto2Record.class),
+                        CompressionCodecName.SNAPPY,
+                        ParquetWriter.DEFAULT_BLOCK_SIZE,
+                        ParquetWriter.DEFAULT_PAGE_SIZE)) {
+            writer.write(record);
+        }
+
+        // Read back and verify
+        try (ParquetReader<TestProto2Record.Builder> reader =
+                
ProtoParquetReader.<TestProto2Record.Builder>builder(path).build()) {
+            TestProto2Record.Builder readRecord = reader.read();
+            assertThat(readRecord).isNotNull();
+            TestProto2Record result = readRecord.build();
+            assertThat(result.getName()).isEqualTo("only_name");
+            // value and flag were not set
+            assertThat(result.hasValue()).isFalse();
+            assertThat(result.hasFlag()).isFalse();
+        }
+    }
+
+    /**
+     * Integration test using ParquetProtoWriters (Flink's production API).
+     *
+     * <p>This validates that PatchedProtoWriteSupport works correctly when 
used through Flink's
+     * ParquetProtoWriters factory, which is the actual production code path.
+     */
+    @Test
+    void testViaParquetProtoWritersForProto3() throws IOException {
+        File outputFile = new File(tempDir, "proto3_via_writers.parquet");
+        Path hadoopPath = new Path(outputFile.toURI());
+        OutputFile outputFileObj =
+                HadoopOutputFile.fromPath(hadoopPath, new 
org.apache.hadoop.conf.Configuration());
+
+        // Create a proto3 message
+        SimpleProtoRecord record =
+                SimpleProtoRecord.newBuilder()
+                        .setFoo("via_writers")
+                        .setBar("test")
+                        .setNum(99)
+                        .build();
+
+        // Write using ParquetProtoWriters (production code path)
+        try (ParquetWriter<SimpleProtoRecord> writer =
+                new ParquetProtoWriters.ParquetProtoWriterBuilder<>(
+                                outputFileObj, SimpleProtoRecord.class)
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .build()) {
+            writer.write(record);
+        }
+
+        // Read back and verify
+        try (ParquetReader<SimpleProtoRecord.Builder> reader =
+                
ProtoParquetReader.<SimpleProtoRecord.Builder>builder(hadoopPath).build()) {
+            SimpleProtoRecord.Builder readRecord = reader.read();
+            assertThat(readRecord).isNotNull();
+            assertThat(readRecord.build()).isEqualTo(record);
+        }
+    }
+
+    /**
+     * Integration test using ParquetProtoWriters for proto2 messages.
+     *
+     * <p>Verifies that proto2 syntax detection works correctly through the 
production API.
+     */
+    @Test
+    void testViaParquetProtoWritersForProto2() throws IOException {
+        File outputFile = new File(tempDir, "proto2_via_writers.parquet");
+        Path hadoopPath = new Path(outputFile.toURI());
+        OutputFile outputFileObj =
+                HadoopOutputFile.fromPath(hadoopPath, new 
org.apache.hadoop.conf.Configuration());
+
+        // Create a proto2 message with partial fields
+        TestProto2Record record =
+                
TestProto2Record.newBuilder().setName("proto2_writer").setFlag(true).build();
+
+        // Write using ParquetProtoWriters (production code path)
+        try (ParquetWriter<TestProto2Record> writer =
+                new ParquetProtoWriters.ParquetProtoWriterBuilder<>(
+                                outputFileObj, TestProto2Record.class)
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .build()) {
+            writer.write(record);
+        }
+
+        // Read back and verify
+        try (ParquetReader<TestProto2Record.Builder> reader =
+                
ProtoParquetReader.<TestProto2Record.Builder>builder(hadoopPath).build()) {
+            TestProto2Record.Builder readRecord = reader.read();
+            assertThat(readRecord).isNotNull();
+            TestProto2Record result = readRecord.build();
+            assertThat(result.getName()).isEqualTo("proto2_writer");
+            assertThat(result.getFlag()).isTrue();
+            // value was not set
+            assertThat(result.hasValue()).isFalse();
+        }
+    }
+}
diff --git 
a/flink-formats/flink-parquet/src/test/resources/protobuf/test_proto2.proto 
b/flink-formats/flink-parquet/src/test/resources/protobuf/test_proto2.proto
new file mode 100644
index 00000000000..d580e93076c
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/protobuf/test_proto2.proto
@@ -0,0 +1,9 @@
+syntax = "proto2";
+
+package org.apache.flink.formats.parquet.protobuf;
+
+message TestProto2Record {
+  optional string name = 1;
+  optional int32 value = 2;
+  optional bool flag = 3;
+}
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 29665f2a306..730cc897a0d 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +68,7 @@ public class ProtoToRowConverter {
                             Thread.currentThread().getContextClassLoader());
             String fullMessageClassName = 
PbFormatUtils.getFullJavaName(descriptor);
             boolean readDefaultValuesForPrimitiveTypes = 
formatConfig.isReadDefaultValues();
-            if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
+            if ("proto3".equals(descriptor.getFile().toProto().getSyntax())) {
                 // pb3 always read default values for primitive types
                 readDefaultValuesForPrimitiveTypes = true;
             }
diff --git 
a/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE 
b/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE
index c1b0186bcf6..38197b7abae 100644
--- a/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE
+++ b/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE
@@ -7,4 +7,4 @@ The Apache Software Foundation (http://www.apache.org/).
 This project bundles the following dependencies under BSD-3 License 
(https://opensource.org/licenses/BSD-3-Clause).
 See bundled license files for details.
 
-- com.google.protobuf:protobuf-java:3.21.7
+- com.google.protobuf:protobuf-java:4.32.1
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index 3df85ee3a7e..7ff4d32fa13 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
 import org.apache.flink.util.Preconditions;
 
-import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.GeneratedMessage;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.TimerReference;
 
@@ -50,14 +50,14 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
     /** The urn which represents the function kind to be executed. */
     private final String functionUrn;
 
-    private final GeneratedMessageV3 userDefinedFunctionProto;
+    private final GeneratedMessage userDefinedFunctionProto;
 
     public BeamTablePythonFunctionRunner(
             Environment environment,
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
-            GeneratedMessageV3 userDefinedFunctionProto,
+            GeneratedMessage userDefinedFunctionProto,
             FlinkMetricContainer flinkMetricContainer,
             KeyedStateBackend<?> keyedStateBackend,
             TypeSerializer<?> keySerializer,
@@ -124,7 +124,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
-            GeneratedMessageV3 userDefinedFunctionProto,
+            GeneratedMessage userDefinedFunctionProto,
             FlinkMetricContainer flinkMetricContainer,
             MemoryManager memoryManager,
             double managedMemoryFraction,
@@ -151,7 +151,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
             String taskName,
             ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
-            GeneratedMessageV3 userDefinedFunctionProto,
+            GeneratedMessage userDefinedFunctionProto,
             FlinkMetricContainer flinkMetricContainer,
             KeyedStateBackend<?> keyedStateBackend,
             TypeSerializer<?> keySerializer,
diff --git a/flink-python/src/main/resources/META-INF/NOTICE 
b/flink-python/src/main/resources/META-INF/NOTICE
index b6768c3e011..6a7f3ade231 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -34,7 +34,7 @@ This project bundles the following dependencies under the BSD 
license.
 See bundled license files for details
 
 - net.sf.py4j:py4j:0.10.9.7
-- com.google.protobuf:protobuf-java:3.21.7
+- com.google.protobuf:protobuf-java:4.32.1
 
 This project bundles the following dependencies under the MIT license. 
(https://opensource.org/licenses/MIT)
 See bundled license files for details.
diff --git a/pom.xml b/pom.xml
index b9cc0ab6c8e..f3d2ddc194a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@ under the License.
                <assertj.version>3.27.3</assertj.version>
                <py4j.version>0.10.9.7</py4j.version>
                <beam.version>2.54.0</beam.version>
-               <protoc.version>3.21.7</protoc.version>
+               <protoc.version>4.32.1</protoc.version>
                <okhttp.version>3.14.9</okhttp.version>
                <testcontainers.version>1.20.2</testcontainers.version>
                <lz4.version>1.8.0</lz4.version>

Reply via email to