This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new f899c845 [AURON #2060] Introduce Auron Flink Kafka TableSource (#2082)
f899c845 is described below

commit f899c8456387c012881e78744eb11122c4922a50
Author: zhangmang <[email protected]>
AuthorDate: Tue Mar 10 19:29:23 2026 +0800

    [AURON #2060] Introduce Auron Flink Kafka TableSource (#2082)
    
    # Which issue does this PR close?
    
    Closes #2060
    
    # Rationale for this change
    * Implementing the integration of Flink Kafka Source and Auron Native
    engine
    
    # What changes are included in this PR?
    * add AuronKafkaSourceFunction
    * add AuronColumnarRowData
    * add SchemaConverters
    * modify FlinkArrowReader
    * modify FlinkArrowUtils
    * FlinkAuronConfiguration
    * AuronKafkaDynamicTableFactory
    * AuronKafkaDynamicTableSource
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * There is currently no Kafka environment integration, so automated
    testing is not possible.
---
 auron-flink-extension/auron-flink-runtime/pom.xml  |  13 ++
 .../apache/auron/flink/arrow/FlinkArrowReader.java |  51 ++++-
 .../apache/auron/flink/arrow/FlinkArrowUtils.java  |   4 +
 .../configuration/FlinkAuronConfiguration.java     |   6 +
 .../kafka/AuronKafkaDynamicTableFactory.java       |   6 +-
 .../kafka/AuronKafkaDynamicTableSource.java        |  45 ++++-
 .../connector/kafka/AuronKafkaSourceFunction.java  | 187 +++++++++++++++++++
 .../flink/connector/kafka/KafkaConstants.java      |   6 +
 .../flink/table/data/AuronColumnarRowData.java     | 206 +++++++++++++++++++++
 .../apache/auron/flink/utils/SchemaConverters.java | 171 +++++++++++++++++
 10 files changed, 678 insertions(+), 17 deletions(-)

diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml 
b/auron-flink-extension/auron-flink-runtime/pom.xml
index 4998e04c..e3695bcd 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -58,6 +58,19 @@
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <!-- Table API Java dependencies (not included in the uber) -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-api-java-bridge</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
index aa0280af..04d35a12 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
@@ -51,6 +51,7 @@ import 
org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
 import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
 import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
 import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
+import org.apache.auron.flink.table.data.AuronColumnarRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.columnar.ColumnarRowData;
 import org.apache.flink.table.data.columnar.vector.ColumnVector;
@@ -82,13 +83,21 @@ public class FlinkArrowReader implements AutoCloseable {
     private final RowType rowType;
     private ColumnVector[] columnVectors;
     private VectorizedColumnBatch batch;
-    private ColumnarRowData reusableRow;
+    private AuronColumnarRowData reusableRow;
     private VectorSchemaRoot root;
 
     private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot 
root, RowType rowType) {
+        this(columnVectors, root, rowType, 0);
+    }
+
+    private FlinkArrowReader(
+            ColumnVector[] columnVectors, VectorSchemaRoot root, RowType 
rowType, int dataColStartIndex) {
         this.columnVectors = columnVectors;
         this.batch = new VectorizedColumnBatch(columnVectors);
-        this.reusableRow = new ColumnarRowData(batch);
+        this.reusableRow = new AuronColumnarRowData(batch);
+        if (dataColStartIndex > 0) {
+            this.reusableRow.setDataColStartIndex(dataColStartIndex);
+        }
         this.root = root;
         this.rowType = rowType;
     }
@@ -107,6 +116,24 @@ public class FlinkArrowReader implements AutoCloseable {
      * @throws UnsupportedOperationException if a LogicalType is not supported
      */
     public static FlinkArrowReader create(VectorSchemaRoot root, RowType 
rowType) {
+        return create(root, rowType, 0);
+    }
+
+    /**
+     * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and 
{@link RowType}.
+     *
+     * <p>The RowType must match the schema of the VectorSchemaRoot (same 
number of fields, matching
+     * types). Each Arrow field vector is wrapped in the appropriate Flink 
{@link ColumnVector}
+     * implementation based on the corresponding Flink {@link LogicalType}.
+     *
+     * @param root the Arrow VectorSchemaRoot containing the data
+     * @param rowType the Flink RowType describing the schema
+     * @param dataColStartIndex the index of the first user data column
+     * @return a new FlinkArrowReader
+     * @throws IllegalArgumentException if field counts do not match
+     * @throws UnsupportedOperationException if a LogicalType is not supported
+     */
+    public static FlinkArrowReader create(VectorSchemaRoot root, RowType 
rowType, int dataColStartIndex) {
         Preconditions.checkNotNull(root, "root must not be null");
         Preconditions.checkNotNull(rowType, "rowType must not be null");
         List<FieldVector> fieldVectors = root.getFieldVectors();
@@ -119,7 +146,7 @@ public class FlinkArrowReader implements AutoCloseable {
         for (int i = 0; i < fieldVectors.size(); i++) {
             columns[i] = createColumnVector(fieldVectors.get(i), 
fields.get(i).getType());
         }
-        return new FlinkArrowReader(columns, root, rowType);
+        return new FlinkArrowReader(columns, root, rowType, dataColStartIndex);
     }
 
     /**
@@ -152,6 +179,19 @@ public class FlinkArrowReader implements AutoCloseable {
      * @param newRoot the new VectorSchemaRoot, must not be null
      */
     public void reset(VectorSchemaRoot newRoot) {
+        reset(newRoot, 0);
+    }
+
+    /**
+     * Resets the reader to use a new {@link VectorSchemaRoot} with the same 
schema. Recreates
+     * column vector wrappers for the new root's field vectors.
+     *
+     * <p>The new root must have the same schema (same number and types of 
fields) as the original.
+     *
+     * @param newRoot the new VectorSchemaRoot, must not be null
+     * @param dataColStartIndex the index of the first user data column
+     */
+    public void reset(VectorSchemaRoot newRoot, int dataColStartIndex) {
         Preconditions.checkNotNull(newRoot, "newRoot must not be null");
         this.root = newRoot;
         List<FieldVector> newVectors = newRoot.getFieldVectors();
@@ -166,7 +206,10 @@ public class FlinkArrowReader implements AutoCloseable {
                     createColumnVector(newVectors.get(i), 
fields.get(i).getType());
         }
         this.batch = new VectorizedColumnBatch(columnVectors);
-        this.reusableRow = new ColumnarRowData(batch);
+        this.reusableRow = new AuronColumnarRowData(batch);
+        if (dataColStartIndex > 0) {
+            this.reusableRow.setDataColStartIndex(dataColStartIndex);
+        }
     }
 
     /**
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
index 273c66c0..ec027ec4 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -92,6 +92,10 @@ public final class FlinkArrowUtils {
         Runtime.getRuntime().addShutdownHook(new 
Thread(ROOT_ALLOCATOR::close));
     }
 
+    public static RootAllocator getRootAllocator() {
+        return ROOT_ALLOCATOR;
+    }
+
     /**
      * Creates a child allocator from the root allocator.
      *
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
index 26a81d3b..69d76bac 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
@@ -34,6 +34,12 @@ public class FlinkAuronConfiguration extends 
AuronConfiguration {
     // When using getOptional, the prefix will be automatically completed. If 
you only need to print the Option key,
     // please manually add the prefix.
     public static final String FLINK_PREFIX = "flink.";
+
+    public static final ConfigOption<Long> NATIVE_MEMORY_SIZE = new 
ConfigOption<>(Long.class)
+            .withKey("auron.native.memory.size")
+            .withDescription("The auron native memory size to use.")
+            .withDefaultValue(256 * 1024 * 1024L); // 256 MB
+
     private final Configuration flinkConfig;
 
     public FlinkAuronConfiguration() {
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index ce9c09a3..177a7c39 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -102,16 +102,14 @@ public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
                 formatConfig.put(KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD, 
tableOptions.get(PB_ROOT_MESSAGE_NAME));
                 formatConfig.put(KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD, 
tableOptions.get(PB_SKIP_FIELDS));
             }
-            String formatConfigJson = mapper.writeValueAsString(formatConfig);
             return new AuronKafkaDynamicTableSource(
                     
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
                     tableOptions.get(TOPIC),
                     kafkaPropertiesJson,
                     format,
-                    formatConfigJson,
+                    formatConfig,
                     tableOptions.get(BUFFER_SIZE),
-                    tableOptions.get(START_UP_MODE),
-                    tableOptions.get(NESTED_COLS_FIELD_MAPPING));
+                    tableOptions.get(START_UP_MODE));
         } catch (Exception e) {
             throw new FlinkRuntimeException("Could not create Auron Kafka 
dynamic table source", e);
         }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 80d536b7..5c7be005 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -16,9 +16,16 @@
  */
 package org.apache.auron.flink.connector.kafka;
 
+import java.util.Map;
+import java.util.UUID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -33,30 +40,27 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource {
     private final String kafkaTopic;
     private final String kafkaPropertiesJson;
     private final String format;
-    private final String formatConfigJson;
+    private final Map<String, String> formatConfig;
     private final int bufferSize;
     private final String startupMode;
-    private final String nestedColsMappingJson;
 
     public AuronKafkaDynamicTableSource(
             DataType physicalDataType,
             String kafkaTopic,
             String kafkaPropertiesJson,
             String format,
-            String formatConfigJson,
+            Map<String, String> formatConfig,
             int bufferSize,
-            String startupMode,
-            String nestedColsMappingJson) {
+            String startupMode) {
         final LogicalType physicalType = physicalDataType.getLogicalType();
         Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row 
data type expected.");
         this.physicalDataType = physicalDataType;
         this.kafkaTopic = kafkaTopic;
         this.kafkaPropertiesJson = kafkaPropertiesJson;
         this.format = format;
-        this.formatConfigJson = formatConfigJson;
+        this.formatConfig = formatConfig;
         this.bufferSize = bufferSize;
         this.startupMode = startupMode;
-        this.nestedColsMappingJson = nestedColsMappingJson;
     }
 
     @Override
@@ -66,12 +70,35 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource {
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
-        return null;
+        String auronOperatorId = "AuronKafkaSource-" + 
UUID.randomUUID().toString();
+        AuronKafkaSourceFunction sourceFunction = new AuronKafkaSourceFunction(
+                physicalDataType.getLogicalType(),
+                auronOperatorId,
+                kafkaTopic,
+                kafkaPropertiesJson,
+                format,
+                formatConfig,
+                bufferSize,
+                startupMode);
+        return new DataStreamScanProvider() {
+
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, 
StreamExecutionEnvironment execEnv) {
+                return execEnv.addSource(sourceFunction);
+            }
+
+            @Override
+            public boolean isBounded() {
+                return false;
+            }
+        };
     }
 
     @Override
     public DynamicTableSource copy() {
-        return null;
+        return new AuronKafkaDynamicTableSource(
+                physicalDataType, kafkaTopic, kafkaPropertiesJson, format, 
formatConfig, bufferSize, startupMode);
     }
 
     @Override
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
new file mode 100644
index 00000000..369f4b1d
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.*;
+import org.apache.auron.flink.arrow.FlinkArrowReader;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
+import org.apache.auron.flink.runtime.operator.FlinkAuronFunction;
+import org.apache.auron.flink.utils.SchemaConverters;
+import org.apache.auron.jni.AuronAdaptor;
+import org.apache.auron.jni.AuronCallNativeWrapper;
+import org.apache.auron.metric.MetricNode;
+import org.apache.auron.protobuf.KafkaFormat;
+import org.apache.auron.protobuf.KafkaScanExecNode;
+import org.apache.auron.protobuf.KafkaStartupMode;
+import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Auron Kafka source function.
+ */
+public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData> implements FlinkAuronFunction {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AuronKafkaSourceFunction.class);
+
+    private final LogicalType outputType;
+    private final String auronOperatorId;
+    private final String topic;
+    private final String kafkaPropertiesJson;
+    private final String format;
+    private final Map<String, String> formatConfig;
+    private final int bufferSize;
+    private final String startupMode;
+    private transient PhysicalPlanNode physicalPlanNode;
+    private volatile boolean isRunning;
+    private transient String auronOperatorIdWithSubtaskIndex;
+    private transient MetricNode nativeMetric;
+
+    public AuronKafkaSourceFunction(
+            LogicalType outputType,
+            String auronOperatorId,
+            String topic,
+            String kafkaPropertiesJson,
+            String format,
+            Map<String, String> formatConfig,
+            int bufferSize,
+            String startupMode) {
+        this.outputType = outputType;
+        this.auronOperatorId = auronOperatorId;
+        this.topic = topic;
+        this.kafkaPropertiesJson = kafkaPropertiesJson;
+        this.format = format;
+        this.formatConfig = formatConfig;
+        this.bufferSize = bufferSize;
+        this.startupMode = startupMode;
+    }
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        // init auron plan
+        PhysicalPlanNode.Builder sourcePlan = PhysicalPlanNode.newBuilder();
+        KafkaScanExecNode.Builder scanExecNode = 
KafkaScanExecNode.newBuilder();
+        scanExecNode.setKafkaTopic(this.topic);
+        scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson);
+        
scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT)));
+        ObjectMapper mapper = new ObjectMapper();
+        
scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig));
+        scanExecNode.setBatchSize(this.bufferSize);
+        if 
(this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) {
+            // copy pb desc file
+            ClassLoader userClassloader = 
Thread.currentThread().getContextClassLoader();
+            String pbDescFileName = 
formatConfig.get(KafkaConstants.KAFKA_PB_FORMAT_PB_DESC_FILE_FIELD);
+            InputStream in = 
userClassloader.getResourceAsStream(pbDescFileName);
+            String pwd = System.getenv("PWD");
+            if (new File(pwd).exists()) {
+                File descFile = new File(pwd + "/" + pbDescFileName);
+                if (!descFile.exists()) {
+                    LOG.info("Auron kafka source writer pb desc file: " + 
pbDescFileName);
+                    FileUtils.copyInputStreamToFile(in, descFile);
+                } else {
+                    LOG.warn("Auron kafka source pb desc file already exist, 
skip copy " + pbDescFileName);
+                }
+            } else {
+                throw new RuntimeException("PWD is not exist");
+            }
+        }
+        // add kafka meta fields
+        scanExecNode.setSchema(SchemaConverters.convertToAuronSchema((RowType) 
outputType, true));
+        auronOperatorIdWithSubtaskIndex =
+                this.auronOperatorId + "-" + 
getRuntimeContext().getIndexOfThisSubtask();
+        scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
+        scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
+        sourcePlan.setKafkaScan(scanExecNode.build());
+        this.physicalPlanNode = sourcePlan.build();
+        this.isRunning = true;
+    }
+
+    @Override
+    public void run(SourceContext<RowData> sourceContext) throws Exception {
+        nativeMetric = new MetricNode(new ArrayList<>()) {
+            @Override
+            public void add(String name, long value) {
+                // TODO Integration with Flink metrics
+                LOG.info(String.format("Metric Auron Source: %s = %s", name, 
value));
+            }
+        };
+        List<RowType.RowField> fieldList = new LinkedList<>();
+        fieldList.add(new RowType.RowField(KAFKA_AURON_META_PARTITION_ID, new 
IntType(false)));
+        fieldList.add(new RowType.RowField(KAFKA_AURON_META_OFFSET, new 
BigIntType(false)));
+        fieldList.add(new RowType.RowField(KAFKA_AURON_META_TIMESTAMP, new 
BigIntType(false)));
+        fieldList.addAll(((RowType) outputType).getFields());
+        RowType auronOutputRowType = new RowType(fieldList);
+        while (this.isRunning) {
+            AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
+                    FlinkArrowUtils.getRootAllocator(),
+                    physicalPlanNode,
+                    nativeMetric,
+                    0,
+                    0,
+                    0,
+                    AuronAdaptor.getInstance()
+                            .getAuronConfiguration()
+                            
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
+            while (wrapper.loadNextBatch(batch -> {
+                FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
+                for (int i = 0; i < batch.getRowCount(); i++) {
+                    sourceContext.collect(arrowReader.read(i));
+                }
+            })) {}
+            ;
+        }
+        LOG.info("Auron kafka source run end");
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+    }
+
+    @Override
+    public List<PhysicalPlanNode> getPhysicalPlanNodes() {
+        return Collections.singletonList(physicalPlanNode);
+    }
+
+    @Override
+    public RowType getOutputType() {
+        return (RowType) outputType;
+    }
+
+    @Override
+    public String getAuronOperatorId() {
+        return auronOperatorId;
+    }
+
+    @Override
+    public MetricNode getMetricNode() {
+        return nativeMetric;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
index d733ccbe..bf249102 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
@@ -232,4 +232,10 @@ public class KafkaConstants {
     public static final String KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD = 
"root_message_name";
     public static final String KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD = 
"skip_fields";
     public static final String KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD = 
"nested_col_mapping";
+
+    public static final String KAFKA_AURON_META_PARTITION_ID = 
"serialized_kafka_records_partition";
+    public static final String KAFKA_AURON_META_OFFSET = 
"serialized_kafka_records_offset";
+    public static final String KAFKA_AURON_META_TIMESTAMP = 
"serialized_kafka_records_timestamp";
+
+    public static final String FLINK_SQL_PROC_TIME_KEY_WORD = "proctime";
 }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/table/data/AuronColumnarRowData.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/table/data/AuronColumnarRowData.java
new file mode 100644
index 00000000..2cbee209
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/table/data/AuronColumnarRowData.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.data;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.*;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.types.RowKind;
+
+/**
+ * A Columnar {@link RowData} implementation that supports Auron System 
Columns.
+ */
+@Internal
+public class AuronColumnarRowData implements RowData, TypedSetters {
+
+    private RowKind rowKind;
+    private VectorizedColumnBatch vectorizedColumnBatch;
+    private int rowId;
+    private int dataColStartIndex = 0;
+
+    public AuronColumnarRowData() {
+        this.rowKind = RowKind.INSERT;
+    }
+
+    public AuronColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
+        this(vectorizedColumnBatch, 0);
+    }
+
+    public AuronColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch, 
int rowId) {
+        this.rowKind = RowKind.INSERT;
+        this.vectorizedColumnBatch = vectorizedColumnBatch;
+        this.rowId = rowId;
+    }
+
+    public void setVectorizedColumnBatch(VectorizedColumnBatch 
vectorizedColumnBatch) {
+        this.vectorizedColumnBatch = vectorizedColumnBatch;
+        this.rowId = 0;
+    }
+
+    public void setDataColStartIndex(int dataColStartIndex) {
+        checkArgument(dataColStartIndex >= 0, "dataColStartIndex should be 
greater than or equal to 0");
+        this.dataColStartIndex = dataColStartIndex;
+    }
+
+    public int getDataColStartIndex() {
+        return this.dataColStartIndex;
+    }
+
+    public void setRowId(int rowId) {
+        this.rowId = rowId;
+    }
+
+    public RowKind getRowKind() {
+        return this.rowKind;
+    }
+
+    public void setRowKind(RowKind kind) {
+        this.rowKind = kind;
+    }
+
+    public int getArity() {
+        return this.vectorizedColumnBatch.getArity() - dataColStartIndex;
+    }
+
+    public boolean isNullAt(int pos) {
+        return this.vectorizedColumnBatch.isNullAt(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public boolean getBoolean(int pos) {
+        return this.vectorizedColumnBatch.getBoolean(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public byte getByte(int pos) {
+        return this.vectorizedColumnBatch.getByte(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public short getShort(int pos) {
+        return this.vectorizedColumnBatch.getShort(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public int getInt(int pos) {
+        return this.vectorizedColumnBatch.getInt(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public long getLong(int pos) {
+        return this.vectorizedColumnBatch.getLong(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public float getFloat(int pos) {
+        return this.vectorizedColumnBatch.getFloat(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public double getDouble(int pos) {
+        return this.vectorizedColumnBatch.getDouble(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public StringData getString(int pos) {
+        BytesColumnVector.Bytes byteArray =
+                this.vectorizedColumnBatch.getByteArray(this.rowId, pos + 
dataColStartIndex);
+        return StringData.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+    }
+
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return this.vectorizedColumnBatch.getDecimal(this.rowId, pos + 
dataColStartIndex, precision, scale);
+    }
+
+    public TimestampData getTimestamp(int pos, int precision) {
+        return this.vectorizedColumnBatch.getTimestamp(this.rowId, pos + 
dataColStartIndex, precision);
+    }
+
+    public <T> RawValueData<T> getRawValue(int pos) {
+        throw new UnsupportedOperationException("RawValueData is not 
supported.");
+    }
+
+    public byte[] getBinary(int pos) {
+        BytesColumnVector.Bytes byteArray =
+                this.vectorizedColumnBatch.getByteArray(this.rowId, pos + 
dataColStartIndex);
+        if (byteArray.len == byteArray.data.length) {
+            return byteArray.data;
+        } else {
+            byte[] ret = new byte[byteArray.len];
+            System.arraycopy(byteArray.data, byteArray.offset, ret, 0, 
byteArray.len);
+            return ret;
+        }
+    }
+
+    public RowData getRow(int pos, int numFields) {
+        return this.vectorizedColumnBatch.getRow(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public ArrayData getArray(int pos) {
+        return this.vectorizedColumnBatch.getArray(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public MapData getMap(int pos) {
+        return this.vectorizedColumnBatch.getMap(this.rowId, pos + 
dataColStartIndex);
+    }
+
+    public void setNullAt(int pos) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setBoolean(int pos, boolean value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setByte(int pos, byte value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setShort(int pos, short value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setInt(int pos, int value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setLong(int pos, long value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setFloat(int pos, float value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setDouble(int pos, double value) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setDecimal(int pos, DecimalData value, int precision) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public void setTimestamp(int pos, TimestampData value, int precision) {
+        throw new UnsupportedOperationException("Not support the operation!");
+    }
+
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException(
+                "AuronColumnarRowData do not support equals, please compare 
fields one by one!");
+    }
+
+    public int hashCode() {
+        throw new UnsupportedOperationException(
+                "AuronColumnarRowData do not support hashCode, please hash 
fields one by one!");
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/utils/SchemaConverters.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/utils/SchemaConverters.java
new file mode 100644
index 00000000..d18216da
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/utils/SchemaConverters.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.utils;
+
+import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+
+import java.util.stream.Collectors;
+import org.apache.auron.protobuf.*;
+import org.apache.flink.table.types.logical.*;
+
+/**
+ * Converts Flink's {@link RowType} to Auron's {@link Schema}.
+ */
+public class SchemaConverters {
+
+    /**
+     * Converts a {@link RowType} to a {@link Schema}.
+     */
+    public static Schema convertToAuronSchema(RowType rowType, boolean 
withKafkaMeta) {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        if (withKafkaMeta) {
+            schemaBuilder.addColumns(Field.newBuilder()
+                    .setName(KAFKA_AURON_META_PARTITION_ID)
+                    .setNullable(false)
+                    .setArrowType(ArrowType.newBuilder()
+                            .setINT32(EmptyMessage.getDefaultInstance())
+                            .build()));
+            schemaBuilder.addColumns(Field.newBuilder()
+                    .setName(KAFKA_AURON_META_OFFSET)
+                    .setNullable(false)
+                    .setArrowType(ArrowType.newBuilder()
+                            .setINT64(EmptyMessage.getDefaultInstance())
+                            .build()));
+            schemaBuilder.addColumns(Field.newBuilder()
+                    .setName(KAFKA_AURON_META_TIMESTAMP)
+                    .setNullable(false)
+                    .setArrowType(ArrowType.newBuilder()
+                            .setINT64(EmptyMessage.getDefaultInstance())
+                            .build()));
+        }
+        for (int i = 0; i < rowType.getFields().size(); i++) {
+            RowType.RowField rowField = rowType.getFields().get(i);
+            if 
(rowField.getName().equalsIgnoreCase(FLINK_SQL_PROC_TIME_KEY_WORD)) {
+                // proc time is nullable
+                schemaBuilder.addColumns(convertField(rowField, true));
+            } else {
+                schemaBuilder.addColumns(convertField(rowField, false));
+            }
+        }
+        return schemaBuilder.build();
+    }
+
+    public static Field convertField(RowType.RowField rowField, boolean 
isProctime) {
+        return Field.newBuilder()
+                .setName(rowField.getName())
+                .setNullable(isProctime ? true : 
rowField.getType().isNullable())
+                .setArrowType(convertToAuronArrowType(rowField.getType()))
+                .build();
+    }
+
+    public static ArrowType convertToAuronArrowType(LogicalType 
flinkLogicalType) {
+        ArrowType.Builder arrowTypeBuilder = ArrowType.newBuilder();
+        switch (flinkLogicalType.getTypeRoot()) {
+            case NULL:
+                arrowTypeBuilder.setNONE(EmptyMessage.getDefaultInstance());
+                break;
+            case BOOLEAN:
+                arrowTypeBuilder.setBOOL(EmptyMessage.getDefaultInstance());
+                break;
+            case TINYINT:
+                arrowTypeBuilder.setINT8(EmptyMessage.getDefaultInstance());
+                break;
+            case SMALLINT:
+                arrowTypeBuilder.setINT16(EmptyMessage.getDefaultInstance());
+                break;
+            case INTEGER:
+                arrowTypeBuilder.setINT32(EmptyMessage.getDefaultInstance());
+                break;
+            case BIGINT:
+                arrowTypeBuilder.setINT64(EmptyMessage.getDefaultInstance());
+                break;
+            case FLOAT:
+                arrowTypeBuilder.setFLOAT32(EmptyMessage.getDefaultInstance());
+                break;
+            case DOUBLE:
+                arrowTypeBuilder.setFLOAT64(EmptyMessage.getDefaultInstance());
+                break;
+            case CHAR:
+            case VARCHAR:
+                arrowTypeBuilder.setUTF8(EmptyMessage.getDefaultInstance());
+                break;
+            case BINARY:
+            case VARBINARY:
+                arrowTypeBuilder.setBINARY(EmptyMessage.getDefaultInstance());
+                break;
+            case DATE:
+                arrowTypeBuilder.setDATE32(EmptyMessage.getDefaultInstance());
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                // timezone is never used in native side
+                
arrowTypeBuilder.setTIMESTAMP(Timestamp.newBuilder().setTimeUnit(TimeUnit.Millisecond));
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                // timezone is never used in native side
+                
arrowTypeBuilder.setTIMESTAMP(Timestamp.newBuilder().setTimeUnit(TimeUnit.Millisecond));
+                break;
+            case DECIMAL:
+                // decimal
+                DecimalType t = (DecimalType) flinkLogicalType;
+                arrowTypeBuilder.setDECIMAL(Decimal.newBuilder()
+                        .setWhole(Math.max(t.getPrecision(), 1))
+                        .setFractional(t.getScale())
+                        .build());
+                break;
+            case ARRAY:
+                // array/list
+                ArrayType a = (ArrayType) flinkLogicalType;
+                arrowTypeBuilder.setLIST(List.newBuilder()
+                        .setFieldType(Field.newBuilder()
+                                .setName("item")
+                                
.setArrowType(convertToAuronArrowType(a.getElementType()))
+                                .setNullable(a.isNullable()))
+                        .build());
+                break;
+            case MAP:
+                MapType m = (MapType) flinkLogicalType;
+                arrowTypeBuilder.setMAP(Map.newBuilder()
+                        .setKeyType(Field.newBuilder()
+                                .setName("key")
+                                
.setArrowType(convertToAuronArrowType(m.getKeyType()))
+                                .setNullable(false))
+                        .setValueType(Field.newBuilder()
+                                .setName("value")
+                                
.setArrowType(convertToAuronArrowType(m.getValueType()))
+                                .setNullable(m.getValueType().isNullable()))
+                        .build());
+                break;
+            case ROW:
+                // StructType
+                RowType r = (RowType) flinkLogicalType;
+                arrowTypeBuilder.setSTRUCT(Struct.newBuilder()
+                        .addAllSubFieldTypes(r.getFields().stream()
+                                .map(e -> Field.newBuilder()
+                                        
.setArrowType(convertToAuronArrowType(e.getType()))
+                                        .setName(e.getName())
+                                        .setNullable(e.getType().isNullable())
+                                        .build())
+                                .collect(Collectors.toList()))
+                        .build());
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Data type conversion not implemented " + 
flinkLogicalType.asSummaryString());
+        }
+        return arrowTypeBuilder.build();
+    }
+}


Reply via email to