This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ffb7edea75 [FLINK-38189][core][python] Introduce 
RowFieldExtractorSchema for Row field serialization (#27353)
6ffb7edea75 is described below

commit 6ffb7edea7591d59ac09932acb2d00de5f53c8d2
Author: Nflrijal <[email protected]>
AuthorDate: Fri Jan 9 11:57:40 2026 +0530

    [FLINK-38189][core][python] Introduce RowFieldExtractorSchema for Row field 
serialization (#27353)
    
    This commit introduces RowFieldExtractorSchema, a new SerializationSchema 
that
    extracts and serializes a specific field from a Row object. This is 
particularly
    useful for Kafka scenarios where keys and values need separate 
serialization.
    
    ---------
    
    Co-authored-by: Noufal Rijal <Nflrijal>
---
 .../serialization/RowFieldExtractorSchema.java     | 122 ++++++++++++++
 .../serialization/RowFieldExtractorSchemaTest.java | 119 ++++++++++++++
 flink-python/pyflink/common/serialization.py       |  55 ++++++-
 .../common/tests/test_serialization_schemas.py     | 114 ++++++++++++-
 .../kafka_row_field_extractor_example.py           | 176 +++++++++++++++++++++
 5 files changed, 584 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/RowFieldExtractorSchema.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/RowFieldExtractorSchema.java
new file mode 100644
index 00000000000..a514380bee4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/RowFieldExtractorSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Serialization schema that extracts a specific field from a {@link Row} and 
returns it as a byte
+ * array.
+ *
+ * <p>The field is required to be of type {@code byte[]}. This schema is 
particularly useful when
+ * using Flink with Kafka, where you may want to use one Row field as the 
message key and another as
+ * the value and perform the conversion to bytes explicitly in user code.
+ *
+ * <p>Example usage with Kafka:
+ *
+ * <pre>{@code
+ * KafkaSink<Row> sink = KafkaSink.<Row>builder()
+ *     .setBootstrapServers(bootstrapServers)
+ *     .setRecordSerializer(
+ *         KafkaRecordSerializationSchema.builder()
+ *             .setTopic("my-topic")
+ *             .setKeySerializationSchema(new RowFieldExtractorSchema(0))    
// field 0 as key
+ *             .setValueSerializationSchema(new RowFieldExtractorSchema(1))  
// field 1 as value
+ *             .build())
+ *     .build();
+ * }</pre>
+ */
+@PublicEvolving
+public class RowFieldExtractorSchema implements SerializationSchema<Row> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The index of the field to extract from the Row. */
+    private final int fieldIndex;
+
+    /**
+     * Creates a new RowFieldExtractorSchema that extracts the field at the 
specified index.
+     *
+     * @param fieldIndex the zero-based index of the field to extract
+     * @throws IllegalArgumentException if fieldIndex is negative
+     */
+    public RowFieldExtractorSchema(int fieldIndex) {
+        checkArgument(fieldIndex >= 0, "Field index must be non-negative, got: 
%s", fieldIndex);
+        this.fieldIndex = fieldIndex;
+    }
+
+    /**
+     * Gets the field index being extracted.
+     *
+     * @return the field index
+     */
+    @VisibleForTesting
+    public int getFieldIndex() {
+        return fieldIndex;
+    }
+
+    @Override
+    public byte[] serialize(@Nullable Row element) {
+        if (element == null) {
+            return new byte[0];
+        }
+
+        checkArgument(
+                fieldIndex < element.getArity(),
+                "Cannot access field %s in Row with arity %s",
+                fieldIndex,
+                element.getArity());
+
+        Object field = element.getField(fieldIndex);
+        if (field == null) {
+            return new byte[0];
+        }
+
+        if (!(field instanceof byte[])) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field at index %s must be of type byte[], but was 
%s",
+                            fieldIndex, field.getClass().getName()));
+        }
+
+        return (byte[]) field;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RowFieldExtractorSchema that = (RowFieldExtractorSchema) o;
+        return fieldIndex == that.fieldIndex;
+    }
+
+    @Override
+    public int hashCode() {
+        return fieldIndex;
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/RowFieldExtractorSchemaTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/RowFieldExtractorSchemaTest.java
new file mode 100644
index 00000000000..8b3e36aae71
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/RowFieldExtractorSchemaTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link RowFieldExtractorSchema}. */
+class RowFieldExtractorSchemaTest {
+
+    @Test
+    void testSerializeByteArrayField() {
+        RowFieldExtractorSchema schema = new RowFieldExtractorSchema(0);
+        byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+        Row row = Row.of(value, 123);
+
+        byte[] result = schema.serialize(row);
+
+        assertThat(result).isEqualTo(value);
+    }
+
+    @Test
+    void testSerializeNonByteArrayFieldThrowsException() {
+        RowFieldExtractorSchema schema = new RowFieldExtractorSchema(1);
+        Row row = Row.of("key", 42); // field 1 is Integer, not byte[]
+
+        assertThatThrownBy(() -> schema.serialize(row))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("must be of type byte[]");
+    }
+
+    @Test
+    void testSerializeNullRow() {
+        RowFieldExtractorSchema schema = new RowFieldExtractorSchema(0);
+
+        byte[] result = schema.serialize(null);
+
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testSerializeNullField() {
+        RowFieldExtractorSchema schema = new RowFieldExtractorSchema(0);
+        Row row = Row.of(null, "value");
+
+        byte[] result = schema.serialize(row);
+
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testSerializeOutOfBoundsIndex() {
+        RowFieldExtractorSchema schema = new RowFieldExtractorSchema(5);
+        Row row = Row.of("field0", "field1");
+
+        assertThatThrownBy(() -> schema.serialize(row))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Cannot access field 5 in Row with arity 
2");
+    }
+
+    @Test
+    void testNegativeFieldIndexThrowsException() {
+        assertThatThrownBy(() -> new RowFieldExtractorSchema(-1))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Field index must be non-negative");
+    }
+
+    @Test
+    void testSerializability() throws IOException, ClassNotFoundException {
+        RowFieldExtractorSchema schema = new RowFieldExtractorSchema(3);
+
+        RowFieldExtractorSchema deserialized =
+                InstantiationUtil.deserializeObject(
+                        InstantiationUtil.serializeObject(schema), 
getClass().getClassLoader());
+
+        assertThat(deserialized.getFieldIndex()).isEqualTo(3);
+    }
+
+    @Test
+    void testEquals() {
+        RowFieldExtractorSchema schema1 = new RowFieldExtractorSchema(1);
+        RowFieldExtractorSchema schema2 = new RowFieldExtractorSchema(1);
+        RowFieldExtractorSchema schema3 = new RowFieldExtractorSchema(2);
+
+        assertThat(schema1).isEqualTo(schema2);
+        assertThat(schema1).isNotEqualTo(schema3);
+    }
+
+    @Test
+    void testHashCode() {
+        RowFieldExtractorSchema schema1 = new RowFieldExtractorSchema(1);
+        RowFieldExtractorSchema schema2 = new RowFieldExtractorSchema(1);
+
+        assertThat(schema1.hashCode()).isEqualTo(schema2.hashCode());
+    }
+}
diff --git a/flink-python/pyflink/common/serialization.py 
b/flink-python/pyflink/common/serialization.py
index 99dc540c47a..6025bd76c9a 100644
--- a/flink-python/pyflink/common/serialization.py
+++ b/flink-python/pyflink/common/serialization.py
@@ -25,7 +25,8 @@ __all__ = [
     'SimpleStringSchema',
     'ByteArraySchema',
     'Encoder',
-    'BulkWriterFactory'
+    'BulkWriterFactory',
+    'RowFieldExtractorSchema',
 ]
 
 
@@ -35,6 +36,7 @@ class SerializationSchema(object):
     into a different serialized representation. Most data sinks (for example 
Apache Kafka) require
     the data to be handed to them in a specific format (for example as byte 
strings).
     """
+
     def __init__(self, j_serialization_schema=None):
         self._j_serialization_schema = j_serialization_schema
 
@@ -48,6 +50,7 @@ class DeserializationSchema(object):
     In addition, the DeserializationSchema describes the produced type which 
lets Flink create
     internal serializers and structures to handle the type.
     """
+
     def __init__(self, j_deserialization_schema=None):
         self._j_deserialization_schema = j_deserialization_schema
 
@@ -126,3 +129,53 @@ class RowDataBulkWriterFactory(BulkWriterFactory):
 
     def get_row_type(self):
         return self._row_type
+
+
+class RowFieldExtractorSchema(SerializationSchema):
+    """
+    Serialization schema that extracts a specific field from a Row and returns 
it as a
+    byte array. The field at the specified index MUST be of type bytes (byte 
array).
+    This schema is particularly useful when using Flink with Kafka, where you 
may want to use a
+    specific field as the message key for partition routing.
+    The field being extracted must already be a byte array. Users are 
responsible for
+    converting their data to bytes before passing it to this schema.
+
+    Example usage with Kafka:
+        >>> from pyflink.common.serialization import RowFieldExtractorSchema
+        >>> from pyflink.datastream.connectors.kafka import KafkaSink, \
+            KafkaRecordSerializationSchema
+        >>>
+        >>> # User must convert data to bytes beforehand
+        >>> # For example: Row.of(b"key-bytes", b"value-bytes")
+        >>>
+        >>> sink = KafkaSink.builder() \\
+        ...     .set_bootstrap_servers("localhost:9092") \\
+        ...     .set_record_serializer(
+        ...         KafkaRecordSerializationSchema.builder()
+        ...             .set_topic("my-topic")
+        ...             
.set_key_serialization_schema(RowFieldExtractorSchema(0))
+                        # Field 0 (must be bytes) as key
+        ...             
.set_value_serialization_schema(RowFieldExtractorSchema(1))
+                        # Field 1 (must be bytes) as value
+        ...             .build()
+        ...     ) \\
+        ...     .build()
+
+    :param field_index: The zero-based index of the field to extract from the 
Row.
+    The field at this index must be of type bytes.
+    """
+
+    def __init__(self, field_index: int):
+        """
+        Creates a new RowFieldExtractorSchema that extracts the field at the 
specified index.
+
+        :param field_index: The zero-based index of the field to extract (must 
be non-negative).
+        :raises ValueError: If field_index is negative.
+        """
+        if field_index < 0:
+            raise ValueError(f"Field index must be non-negative, got: 
{field_index}")
+        gateway = get_gateway()
+        j_row_field_extractor_schema = 
gateway.jvm.org.apache.flink.api.common.serialization \
+            .RowFieldExtractorSchema(field_index)
+        super(RowFieldExtractorSchema, self).__init__(
+            j_serialization_schema=j_row_field_extractor_schema)
diff --git a/flink-python/pyflink/common/tests/test_serialization_schemas.py 
b/flink-python/pyflink/common/tests/test_serialization_schemas.py
index bbbe8b8f123..7f10da4d841 100644
--- a/flink-python/pyflink/common/tests/test_serialization_schemas.py
+++ b/flink-python/pyflink/common/tests/test_serialization_schemas.py
@@ -15,8 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pyflink.common.serialization import SimpleStringSchema, ByteArraySchema
+from pyflink.common.serialization import (
+    SimpleStringSchema,
+    ByteArraySchema,
+    RowFieldExtractorSchema,
+)
 from pyflink.testing.test_case_utils import PyFlinkTestCase
+from pyflink.java_gateway import get_gateway
 
 
 class SimpleStringSchemaTests(PyFlinkTestCase):
@@ -39,3 +44,110 @@ class SimpleByteSchemaTests(PyFlinkTestCase):
                          
simple_byte_schema._j_serialization_schema.serialize(expected_bytes))
         self.assertEqual(expected_bytes, 
simple_byte_schema._j_deserialization_schema
                          .deserialize(expected_bytes))
+
+
+class RowFieldExtractorSchemaTests(PyFlinkTestCase):
+    """Tests for RowFieldExtractorSchema."""
+
+    def test_row_field_extractor_schema_creation(self):
+        """Test RowFieldExtractorSchema can be created with valid index."""
+        schema = RowFieldExtractorSchema(0)
+        self.assertIsNotNone(schema._j_serialization_schema)
+
+    def test_serialize_byte_array_field(self):
+        """Test serializing a byte array field from a Row."""
+        schema = RowFieldExtractorSchema(0)
+        gateway = get_gateway()
+        j_row = gateway.jvm.org.apache.flink.types.Row(2)
+
+        # Set byte array field
+        test_bytes = "test-value".encode('utf-8')
+        j_row.setField(0, test_bytes)
+        j_row.setField(1, "other-data".encode('utf-8'))
+
+        result = schema._j_serialization_schema.serialize(j_row)
+        self.assertEqual(test_bytes, bytes(result))
+
+    def test_serialize_second_field(self):
+        """Test serializing byte array from second field of a Row."""
+        schema = RowFieldExtractorSchema(1)
+        gateway = get_gateway()
+        j_row = gateway.jvm.org.apache.flink.types.Row(2)
+
+        test_bytes = "field-1-value".encode('utf-8')
+        j_row.setField(0, "field-0".encode('utf-8'))
+        j_row.setField(1, test_bytes)
+
+        result = schema._j_serialization_schema.serialize(j_row)
+        self.assertEqual(test_bytes, bytes(result))
+
+    def test_serialize_null_row(self):
+        """Test serializing null Row returns empty byte array."""
+        schema = RowFieldExtractorSchema(0)
+        result = schema._j_serialization_schema.serialize(None)
+        self.assertEqual(0, len(result))
+
+    def test_serialize_null_field(self):
+        """Test serializing Row with null field returns empty byte array."""
+        schema = RowFieldExtractorSchema(0)
+        gateway = get_gateway()
+        j_row = gateway.jvm.org.apache.flink.types.Row(2)
+        j_row.setField(0, None)  # null field
+        j_row.setField(1, "value".encode('utf-8'))
+
+        result = schema._j_serialization_schema.serialize(j_row)
+        self.assertEqual(0, len(result))
+
+    def test_serialize_non_byte_array_raises_error(self):
+        """Test that non-byte-array field raises IllegalArgumentException."""
+        schema = RowFieldExtractorSchema(0)
+        gateway = get_gateway()
+        j_row = gateway.jvm.org.apache.flink.types.Row(2)
+
+        # set a string instead of byte array
+        j_row.setField(0, "not-bytes")
+        j_row.setField(1, "other")
+
+        with self.assertRaises(Exception):
+            schema._j_serialization_schema.serialize(j_row)
+        # Should get IllegalArgumentException from Java
+
+    def test_negative_field_index_raises_error(self):
+        """Test that negative field index raises ValueError."""
+        with self.assertRaises(ValueError) as context:
+            RowFieldExtractorSchema(-1)
+        self.assertIn("Field index must be non-negative", 
str(context.exception))
+
+    def test_get_field_index(self):
+        """Test that getFieldIndex returns correct value."""
+        schema = RowFieldExtractorSchema(3)
+        field_index = schema._j_serialization_schema.getFieldIndex()
+        self.assertEqual(3, field_index)
+
+    def test_multiple_schemas_with_different_indices(self):
+        """Test creating multiple schemas with different field indices."""
+        schema0 = RowFieldExtractorSchema(0)
+        schema1 = RowFieldExtractorSchema(1)
+        schema2 = RowFieldExtractorSchema(2)
+
+        self.assertEqual(0, schema0._j_serialization_schema.getFieldIndex())
+        self.assertEqual(1, schema1._j_serialization_schema.getFieldIndex())
+        self.assertEqual(2, schema2._j_serialization_schema.getFieldIndex())
+
+    def test_schema_equals(self):
+        """Test that schemas with same field index are considered equal."""
+        schema1 = RowFieldExtractorSchema(1)
+        schema2 = RowFieldExtractorSchema(1)
+        schema3 = RowFieldExtractorSchema(2)
+
+        
self.assertTrue(schema1._j_serialization_schema.equals(schema2._j_serialization_schema))
+        
self.assertFalse(schema1._j_serialization_schema.equals(schema3._j_serialization_schema))
+
+    def test_schema_hash_code(self):
+        """Test that schemas with same field index have same hash code."""
+        schema1 = RowFieldExtractorSchema(1)
+        schema2 = RowFieldExtractorSchema(1)
+
+        hash1 = schema1._j_serialization_schema.hashCode()
+        hash2 = schema2._j_serialization_schema.hashCode()
+        self.assertEqual(hash1, hash2)
diff --git 
a/flink-python/pyflink/examples/datastream/connectors/kafka_row_field_extractor_example.py
 
b/flink-python/pyflink/examples/datastream/connectors/kafka_row_field_extractor_example.py
new file mode 100644
index 00000000000..53a74f3edbb
--- /dev/null
+++ 
b/flink-python/pyflink/examples/datastream/connectors/kafka_row_field_extractor_example.py
@@ -0,0 +1,176 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Example demonstrating RowFieldExtractorSchema usage with Kafka.
+
+This example shows how to use RowFieldExtractorSchema to serialize specific
+Row fields as Kafka message keys and values. The schema requires fields to
+be byte arrays, giving you full control over serialization.
+
+Requirements:
+    - Kafka running on localhost:9092
+    - Topic 'row-extractor-example' created
+    - Kafka connector JAR in classpath
+
+Usage:
+    python kafka_row_field_extractor_example.py
+"""
+
+import json
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors.kafka import (
+    KafkaSink,
+    KafkaRecordSerializationSchema,
+    DeliveryGuarantee
+)
+from pyflink.common.serialization import RowFieldExtractorSchema
+from pyflink.common.typeinfo import Types
+from pyflink.common import Row
+
+
+def serialize_to_json_bytes(data):
+    """Helper function to serialize data to JSON byte array."""
+    return json.dumps(data).encode('utf-8')
+
+
+def serialize_key_bytes(key):
+    """Helper function to serialize a key to byte array."""
+    return key.encode('utf-8')
+
+
+def create_sample_data():
+    """
+    Create sample e-commerce events.
+    Each event has: user_id (key) and event_data (value).
+    """
+    events = [
+        {
+            "user_id": "user-001",
+            "event": {"type": "purchase", "item": "laptop", "price": 999.99}
+        },
+        {
+            "user_id": "user-002",
+            "event": {"type": "view", "item": "phone", "timestamp": 
"2024-01-07T10:30:00"}
+        },
+        {
+            "user_id": "user-001",
+            "event": {"type": "add_to_cart", "item": "mouse", "quantity": 2}
+        },
+        {
+            "user_id": "user-003",
+            "event": {"type": "purchase", "item": "keyboard", "price": 79.99}
+        },
+    ]
+
+    # Convert to Rows with byte array fields
+    rows = []
+    for event in events:
+        row = Row(
+            serialize_key_bytes(event["user_id"]),      # Field 0: user_id as 
bytes (Kafka key)
+            serialize_to_json_bytes(event["event"])     # Field 1: event as 
JSON bytes (Kafka value)
+        )
+        rows.append(row)
+
+    return rows
+
+
+def kafka_row_field_extractor_example():
+    """
+    Demonstrate RowFieldExtractorSchema with Kafka Sink.
+
+    This example:
+    1. Creates sample e-commerce events
+    2. Converts user_id to bytes (for Kafka key)
+    3. Converts event data to JSON bytes (for Kafka value)
+    4. Uses RowFieldExtractorSchema to extract and send to Kafka
+    """
+
+    # Create execution environment
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)
+
+    # Generate sample data
+    data = create_sample_data()
+
+    # Create DataStream with proper type information
+    # Row has 2 fields: both are byte arrays
+    ds = env.from_collection(
+        data,
+        type_info=Types.ROW([
+            Types.PRIMITIVE_ARRAY(Types.BYTE()),  # Field 0: user_id (key)
+            Types.PRIMITIVE_ARRAY(Types.BYTE())   # Field 1: event_data (value)
+        ])
+    )
+
+    # Optional: Print what we're sending (for debugging)
+    def print_event(row):
+        user_id = row[0].decode('utf-8')
+        event_data = json.loads(row[1].decode('utf-8'))
+        print(f"Sending: User={user_id}, Event={event_data}")
+        return row
+
+    ds = ds.map(
+        print_event,
+        output_type=Types.ROW([
+            Types.PRIMITIVE_ARRAY(Types.BYTE()),
+            Types.PRIMITIVE_ARRAY(Types.BYTE())
+        ])
+    )
+
+    # Create Kafka Sink with RowFieldExtractorSchema
+    kafka_sink = KafkaSink.builder() \
+        .set_bootstrap_servers("localhost:9092") \
+        .set_record_serializer(
+            KafkaRecordSerializationSchema.builder()
+        .set_topic("row-extractor-example")
+        # Extract field 0 (user_id) as Kafka key for partitioning
+        .set_key_serialization_schema(RowFieldExtractorSchema(0))
+        # Extract field 1 (event_data) as Kafka value
+        .set_value_serialization_schema(RowFieldExtractorSchema(1))
+        .build()
+    ) \
+        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+        .build()
+
+    # Send to Kafka
+    ds.sink_to(kafka_sink)
+
+    # Execute
+    env.execute("Kafka RowFieldExtractorSchema Example")
+
+
+if __name__ == '__main__':
+    print("=" * 70)
+    print("Kafka RowFieldExtractorSchema Example")
+    print("=" * 70)
+    print("\nMake sure:")
+    print("  1. Kafka is running on localhost:9092")
+    print("  2. Topic 'row-extractor-example' exists")
+    print("  3. Kafka connector JAR is in classpath")
+    print("\nTo create topic:")
+    print("  kafka-topics.sh --create --topic row-extractor-example \\")
+    print("    --bootstrap-server localhost:9092 --partitions 3")
+    print("\nTo consume messages:")
+    print("  kafka-console-consumer.sh --bootstrap-server localhost:9092 \\")
+    print("    --topic row-extractor-example --from-beginning \\")
+    print("    --property print.key=true --property key.separator=' => '")
+    print("\n" + "=" * 70 + "\n")
+
+    # Run the example
+    kafka_row_field_extractor_example()

Reply via email to