This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new f899c845 [AURON #2060] Introduce Auron Flink Kafka TableSource (#2082)
f899c845 is described below
commit f899c8456387c012881e78744eb11122c4922a50
Author: zhangmang <[email protected]>
AuthorDate: Tue Mar 10 19:29:23 2026 +0800
[AURON #2060] Introduce Auron Flink Kafka TableSource (#2082)
# Which issue does this PR close?
Closes #2060
# Rationale for this change
* Implementing the integration of Flink Kafka Source and Auron Native
engine
# What changes are included in this PR?
* add AuronKafkaSourceFunction
* add AuronColumnarRowData
* add SchemaConverters
* modify FlinkArrowReader
* modify FlinkArrowUtils
* FlinkAuronConfiguration
* AuronKafkaDynamicTableFactory
* AuronKafkaDynamicTableSource
# Are there any user-facing changes?
* No
# How was this patch tested?
* There is currently no Kafka environment integration, so automated
testing is not possible.
---
auron-flink-extension/auron-flink-runtime/pom.xml | 13 ++
.../apache/auron/flink/arrow/FlinkArrowReader.java | 51 ++++-
.../apache/auron/flink/arrow/FlinkArrowUtils.java | 4 +
.../configuration/FlinkAuronConfiguration.java | 6 +
.../kafka/AuronKafkaDynamicTableFactory.java | 6 +-
.../kafka/AuronKafkaDynamicTableSource.java | 45 ++++-
.../connector/kafka/AuronKafkaSourceFunction.java | 187 +++++++++++++++++++
.../flink/connector/kafka/KafkaConstants.java | 6 +
.../flink/table/data/AuronColumnarRowData.java | 206 +++++++++++++++++++++
.../apache/auron/flink/utils/SchemaConverters.java | 171 +++++++++++++++++
10 files changed, 678 insertions(+), 17 deletions(-)
diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml
b/auron-flink-extension/auron-flink-runtime/pom.xml
index 4998e04c..e3695bcd 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -58,6 +58,19 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Table API Java dependencies (not included in the uber) -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
index aa0280af..04d35a12 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
@@ -51,6 +51,7 @@ import
org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
+import org.apache.auron.flink.table.data.AuronColumnarRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.columnar.ColumnarRowData;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
@@ -82,13 +83,21 @@ public class FlinkArrowReader implements AutoCloseable {
private final RowType rowType;
private ColumnVector[] columnVectors;
private VectorizedColumnBatch batch;
- private ColumnarRowData reusableRow;
+ private AuronColumnarRowData reusableRow;
private VectorSchemaRoot root;
private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot
root, RowType rowType) {
+ this(columnVectors, root, rowType, 0);
+ }
+
+ private FlinkArrowReader(
+ ColumnVector[] columnVectors, VectorSchemaRoot root, RowType
rowType, int dataColStartIndex) {
this.columnVectors = columnVectors;
this.batch = new VectorizedColumnBatch(columnVectors);
- this.reusableRow = new ColumnarRowData(batch);
+ this.reusableRow = new AuronColumnarRowData(batch);
+ if (dataColStartIndex > 0) {
+ this.reusableRow.setDataColStartIndex(dataColStartIndex);
+ }
this.root = root;
this.rowType = rowType;
}
@@ -107,6 +116,24 @@ public class FlinkArrowReader implements AutoCloseable {
* @throws UnsupportedOperationException if a LogicalType is not supported
*/
public static FlinkArrowReader create(VectorSchemaRoot root, RowType
rowType) {
+ return create(root, rowType, 0);
+ }
+
+ /**
+ * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and
{@link RowType}.
+ *
+ * <p>The RowType must match the schema of the VectorSchemaRoot (same
number of fields, matching
+ * types). Each Arrow field vector is wrapped in the appropriate Flink
{@link ColumnVector}
+ * implementation based on the corresponding Flink {@link LogicalType}.
+ *
+ * @param root the Arrow VectorSchemaRoot containing the data
+ * @param rowType the Flink RowType describing the schema
+ * @param dataColStartIndex the index of the first user data column
+ * @return a new FlinkArrowReader
+ * @throws IllegalArgumentException if field counts do not match
+ * @throws UnsupportedOperationException if a LogicalType is not supported
+ */
+ public static FlinkArrowReader create(VectorSchemaRoot root, RowType
rowType, int dataColStartIndex) {
Preconditions.checkNotNull(root, "root must not be null");
Preconditions.checkNotNull(rowType, "rowType must not be null");
List<FieldVector> fieldVectors = root.getFieldVectors();
@@ -119,7 +146,7 @@ public class FlinkArrowReader implements AutoCloseable {
for (int i = 0; i < fieldVectors.size(); i++) {
columns[i] = createColumnVector(fieldVectors.get(i),
fields.get(i).getType());
}
- return new FlinkArrowReader(columns, root, rowType);
+ return new FlinkArrowReader(columns, root, rowType, dataColStartIndex);
}
/**
@@ -152,6 +179,19 @@ public class FlinkArrowReader implements AutoCloseable {
* @param newRoot the new VectorSchemaRoot, must not be null
*/
public void reset(VectorSchemaRoot newRoot) {
+ reset(newRoot, 0);
+ }
+
+ /**
+ * Resets the reader to use a new {@link VectorSchemaRoot} with the same
schema. Recreates
+ * column vector wrappers for the new root's field vectors.
+ *
+ * <p>The new root must have the same schema (same number and types of
fields) as the original.
+ *
+ * @param newRoot the new VectorSchemaRoot, must not be null
+ * @param dataColStartIndex the index of the first user data column
+ */
+ public void reset(VectorSchemaRoot newRoot, int dataColStartIndex) {
Preconditions.checkNotNull(newRoot, "newRoot must not be null");
this.root = newRoot;
List<FieldVector> newVectors = newRoot.getFieldVectors();
@@ -166,7 +206,10 @@ public class FlinkArrowReader implements AutoCloseable {
createColumnVector(newVectors.get(i),
fields.get(i).getType());
}
this.batch = new VectorizedColumnBatch(columnVectors);
- this.reusableRow = new ColumnarRowData(batch);
+ this.reusableRow = new AuronColumnarRowData(batch);
+ if (dataColStartIndex > 0) {
+ this.reusableRow.setDataColStartIndex(dataColStartIndex);
+ }
}
/**
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
index 273c66c0..ec027ec4 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -92,6 +92,10 @@ public final class FlinkArrowUtils {
Runtime.getRuntime().addShutdownHook(new
Thread(ROOT_ALLOCATOR::close));
}
+ public static RootAllocator getRootAllocator() {
+ return ROOT_ALLOCATOR;
+ }
+
/**
* Creates a child allocator from the root allocator.
*
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
index 26a81d3b..69d76bac 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
@@ -34,6 +34,12 @@ public class FlinkAuronConfiguration extends
AuronConfiguration {
// When using getOptional, the prefix will be automatically completed. If
you only need to print the Option key,
// please manually add the prefix.
public static final String FLINK_PREFIX = "flink.";
+
+ public static final ConfigOption<Long> NATIVE_MEMORY_SIZE = new
ConfigOption<>(Long.class)
+ .withKey("auron.native.memory.size")
+ .withDescription("The auron native memory size to use.")
+ .withDefaultValue(256 * 1024 * 1024L); // 256 MB
+
private final Configuration flinkConfig;
public FlinkAuronConfiguration() {
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
index ce9c09a3..177a7c39 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -102,16 +102,14 @@ public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory
formatConfig.put(KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD,
tableOptions.get(PB_ROOT_MESSAGE_NAME));
formatConfig.put(KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD,
tableOptions.get(PB_SKIP_FIELDS));
}
- String formatConfigJson = mapper.writeValueAsString(formatConfig);
return new AuronKafkaDynamicTableSource(
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
tableOptions.get(TOPIC),
kafkaPropertiesJson,
format,
- formatConfigJson,
+ formatConfig,
tableOptions.get(BUFFER_SIZE),
- tableOptions.get(START_UP_MODE),
- tableOptions.get(NESTED_COLS_FIELD_MAPPING));
+ tableOptions.get(START_UP_MODE));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create Auron Kafka
dynamic table source", e);
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 80d536b7..5c7be005 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -16,9 +16,16 @@
*/
package org.apache.auron.flink.connector.kafka;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -33,30 +40,27 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource {
private final String kafkaTopic;
private final String kafkaPropertiesJson;
private final String format;
- private final String formatConfigJson;
+ private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
- private final String nestedColsMappingJson;
public AuronKafkaDynamicTableSource(
DataType physicalDataType,
String kafkaTopic,
String kafkaPropertiesJson,
String format,
- String formatConfigJson,
+ Map<String, String> formatConfig,
int bufferSize,
- String startupMode,
- String nestedColsMappingJson) {
+ String startupMode) {
final LogicalType physicalType = physicalDataType.getLogicalType();
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row
data type expected.");
this.physicalDataType = physicalDataType;
this.kafkaTopic = kafkaTopic;
this.kafkaPropertiesJson = kafkaPropertiesJson;
this.format = format;
- this.formatConfigJson = formatConfigJson;
+ this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
this.startupMode = startupMode;
- this.nestedColsMappingJson = nestedColsMappingJson;
}
@Override
@@ -66,12 +70,35 @@ public class AuronKafkaDynamicTableSource implements
ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
- return null;
+ String auronOperatorId = "AuronKafkaSource-" +
UUID.randomUUID().toString();
+ AuronKafkaSourceFunction sourceFunction = new AuronKafkaSourceFunction(
+ physicalDataType.getLogicalType(),
+ auronOperatorId,
+ kafkaTopic,
+ kafkaPropertiesJson,
+ format,
+ formatConfig,
+ bufferSize,
+ startupMode);
+ return new DataStreamScanProvider() {
+
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
+ return execEnv.addSource(sourceFunction);
+ }
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+ };
}
@Override
public DynamicTableSource copy() {
- return null;
+ return new AuronKafkaDynamicTableSource(
+ physicalDataType, kafkaTopic, kafkaPropertiesJson, format,
formatConfig, bufferSize, startupMode);
}
@Override
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
new file mode 100644
index 00000000..369f4b1d
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.*;
+import org.apache.auron.flink.arrow.FlinkArrowReader;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
+import org.apache.auron.flink.runtime.operator.FlinkAuronFunction;
+import org.apache.auron.flink.utils.SchemaConverters;
+import org.apache.auron.jni.AuronAdaptor;
+import org.apache.auron.jni.AuronCallNativeWrapper;
+import org.apache.auron.metric.MetricNode;
+import org.apache.auron.protobuf.KafkaFormat;
+import org.apache.auron.protobuf.KafkaScanExecNode;
+import org.apache.auron.protobuf.KafkaStartupMode;
+import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Auron Kafka source function.
+ */
+public class AuronKafkaSourceFunction extends
RichParallelSourceFunction<RowData> implements FlinkAuronFunction {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuronKafkaSourceFunction.class);
+
+ private final LogicalType outputType;
+ private final String auronOperatorId;
+ private final String topic;
+ private final String kafkaPropertiesJson;
+ private final String format;
+ private final Map<String, String> formatConfig;
+ private final int bufferSize;
+ private final String startupMode;
+ private transient PhysicalPlanNode physicalPlanNode;
+ private volatile boolean isRunning;
+ private transient String auronOperatorIdWithSubtaskIndex;
+ private transient MetricNode nativeMetric;
+
+ public AuronKafkaSourceFunction(
+ LogicalType outputType,
+ String auronOperatorId,
+ String topic,
+ String kafkaPropertiesJson,
+ String format,
+ Map<String, String> formatConfig,
+ int bufferSize,
+ String startupMode) {
+ this.outputType = outputType;
+ this.auronOperatorId = auronOperatorId;
+ this.topic = topic;
+ this.kafkaPropertiesJson = kafkaPropertiesJson;
+ this.format = format;
+ this.formatConfig = formatConfig;
+ this.bufferSize = bufferSize;
+ this.startupMode = startupMode;
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ // init auron plan
+ PhysicalPlanNode.Builder sourcePlan = PhysicalPlanNode.newBuilder();
+ KafkaScanExecNode.Builder scanExecNode =
KafkaScanExecNode.newBuilder();
+ scanExecNode.setKafkaTopic(this.topic);
+ scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson);
+
scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT)));
+ ObjectMapper mapper = new ObjectMapper();
+
scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig));
+ scanExecNode.setBatchSize(this.bufferSize);
+ if
(this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) {
+ // copy pb desc file
+ ClassLoader userClassloader =
Thread.currentThread().getContextClassLoader();
+ String pbDescFileName =
formatConfig.get(KafkaConstants.KAFKA_PB_FORMAT_PB_DESC_FILE_FIELD);
+ InputStream in =
userClassloader.getResourceAsStream(pbDescFileName);
+ String pwd = System.getenv("PWD");
+ if (new File(pwd).exists()) {
+ File descFile = new File(pwd + "/" + pbDescFileName);
+ if (!descFile.exists()) {
+ LOG.info("Auron kafka source writer pb desc file: " +
pbDescFileName);
+ FileUtils.copyInputStreamToFile(in, descFile);
+ } else {
+ LOG.warn("Auron kafka source pb desc file already exist,
skip copy " + pbDescFileName);
+ }
+ } else {
+ throw new RuntimeException("PWD is not exist");
+ }
+ }
+ // add kafka meta fields
+ scanExecNode.setSchema(SchemaConverters.convertToAuronSchema((RowType)
outputType, true));
+ auronOperatorIdWithSubtaskIndex =
+ this.auronOperatorId + "-" +
getRuntimeContext().getIndexOfThisSubtask();
+ scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
+ scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
+ sourcePlan.setKafkaScan(scanExecNode.build());
+ this.physicalPlanNode = sourcePlan.build();
+ this.isRunning = true;
+ }
+
+ @Override
+ public void run(SourceContext<RowData> sourceContext) throws Exception {
+ nativeMetric = new MetricNode(new ArrayList<>()) {
+ @Override
+ public void add(String name, long value) {
+ // TODO Integration with Flink metrics
+ LOG.info(String.format("Metric Auron Source: %s = %s", name,
value));
+ }
+ };
+ List<RowType.RowField> fieldList = new LinkedList<>();
+ fieldList.add(new RowType.RowField(KAFKA_AURON_META_PARTITION_ID, new
IntType(false)));
+ fieldList.add(new RowType.RowField(KAFKA_AURON_META_OFFSET, new
BigIntType(false)));
+ fieldList.add(new RowType.RowField(KAFKA_AURON_META_TIMESTAMP, new
BigIntType(false)));
+ fieldList.addAll(((RowType) outputType).getFields());
+ RowType auronOutputRowType = new RowType(fieldList);
+ while (this.isRunning) {
+ AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
+ FlinkArrowUtils.getRootAllocator(),
+ physicalPlanNode,
+ nativeMetric,
+ 0,
+ 0,
+ 0,
+ AuronAdaptor.getInstance()
+ .getAuronConfiguration()
+
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
+ while (wrapper.loadNextBatch(batch -> {
+ FlinkArrowReader arrowReader = FlinkArrowReader.create(batch,
auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ sourceContext.collect(arrowReader.read(i));
+ }
+ })) {}
+ ;
+ }
+ LOG.info("Auron kafka source run end");
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ }
+
+ @Override
+ public List<PhysicalPlanNode> getPhysicalPlanNodes() {
+ return Collections.singletonList(physicalPlanNode);
+ }
+
+ @Override
+ public RowType getOutputType() {
+ return (RowType) outputType;
+ }
+
+ @Override
+ public String getAuronOperatorId() {
+ return auronOperatorId;
+ }
+
+ @Override
+ public MetricNode getMetricNode() {
+ return nativeMetric;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
index d733ccbe..bf249102 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
@@ -232,4 +232,10 @@ public class KafkaConstants {
public static final String KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD =
"root_message_name";
public static final String KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD =
"skip_fields";
public static final String KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD =
"nested_col_mapping";
+
+ public static final String KAFKA_AURON_META_PARTITION_ID =
"serialized_kafka_records_partition";
+ public static final String KAFKA_AURON_META_OFFSET =
"serialized_kafka_records_offset";
+ public static final String KAFKA_AURON_META_TIMESTAMP =
"serialized_kafka_records_timestamp";
+
+ public static final String FLINK_SQL_PROC_TIME_KEY_WORD = "proctime";
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/table/data/AuronColumnarRowData.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/table/data/AuronColumnarRowData.java
new file mode 100644
index 00000000..2cbee209
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/table/data/AuronColumnarRowData.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.data;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.*;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.types.RowKind;
+
+/**
+ * A Columnar {@link RowData} implementation that supports Auron System
Columns.
+ */
+@Internal
+public class AuronColumnarRowData implements RowData, TypedSetters {
+
+ private RowKind rowKind;
+ private VectorizedColumnBatch vectorizedColumnBatch;
+ private int rowId;
+ private int dataColStartIndex = 0;
+
+ public AuronColumnarRowData() {
+ this.rowKind = RowKind.INSERT;
+ }
+
+ public AuronColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
+ this(vectorizedColumnBatch, 0);
+ }
+
+ public AuronColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch,
int rowId) {
+ this.rowKind = RowKind.INSERT;
+ this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.rowId = rowId;
+ }
+
+ public void setVectorizedColumnBatch(VectorizedColumnBatch
vectorizedColumnBatch) {
+ this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.rowId = 0;
+ }
+
+ public void setDataColStartIndex(int dataColStartIndex) {
+ checkArgument(dataColStartIndex >= 0, "dataColStartIndex should be
greater than or equal to 0");
+ this.dataColStartIndex = dataColStartIndex;
+ }
+
+ public int getDataColStartIndex() {
+ return this.dataColStartIndex;
+ }
+
+ public void setRowId(int rowId) {
+ this.rowId = rowId;
+ }
+
+ public RowKind getRowKind() {
+ return this.rowKind;
+ }
+
+ public void setRowKind(RowKind kind) {
+ this.rowKind = kind;
+ }
+
+ public int getArity() {
+ return this.vectorizedColumnBatch.getArity() - dataColStartIndex;
+ }
+
+ public boolean isNullAt(int pos) {
+ return this.vectorizedColumnBatch.isNullAt(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public boolean getBoolean(int pos) {
+ return this.vectorizedColumnBatch.getBoolean(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public byte getByte(int pos) {
+ return this.vectorizedColumnBatch.getByte(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public short getShort(int pos) {
+ return this.vectorizedColumnBatch.getShort(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public int getInt(int pos) {
+ return this.vectorizedColumnBatch.getInt(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public long getLong(int pos) {
+ return this.vectorizedColumnBatch.getLong(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public float getFloat(int pos) {
+ return this.vectorizedColumnBatch.getFloat(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public double getDouble(int pos) {
+ return this.vectorizedColumnBatch.getDouble(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public StringData getString(int pos) {
+ BytesColumnVector.Bytes byteArray =
+ this.vectorizedColumnBatch.getByteArray(this.rowId, pos +
dataColStartIndex);
+ return StringData.fromBytes(byteArray.data, byteArray.offset,
byteArray.len);
+ }
+
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return this.vectorizedColumnBatch.getDecimal(this.rowId, pos +
dataColStartIndex, precision, scale);
+ }
+
+ public TimestampData getTimestamp(int pos, int precision) {
+ return this.vectorizedColumnBatch.getTimestamp(this.rowId, pos +
dataColStartIndex, precision);
+ }
+
+ public <T> RawValueData<T> getRawValue(int pos) {
+ throw new UnsupportedOperationException("RawValueData is not
supported.");
+ }
+
+ public byte[] getBinary(int pos) {
+ BytesColumnVector.Bytes byteArray =
+ this.vectorizedColumnBatch.getByteArray(this.rowId, pos +
dataColStartIndex);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ byte[] ret = new byte[byteArray.len];
+ System.arraycopy(byteArray.data, byteArray.offset, ret, 0,
byteArray.len);
+ return ret;
+ }
+ }
+
+ public RowData getRow(int pos, int numFields) {
+ return this.vectorizedColumnBatch.getRow(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public ArrayData getArray(int pos) {
+ return this.vectorizedColumnBatch.getArray(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public MapData getMap(int pos) {
+ return this.vectorizedColumnBatch.getMap(this.rowId, pos +
dataColStartIndex);
+ }
+
+ public void setNullAt(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setBoolean(int pos, boolean value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setByte(int pos, byte value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setShort(int pos, short value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setInt(int pos, int value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setLong(int pos, long value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setFloat(int pos, float value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setDouble(int pos, double value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setDecimal(int pos, DecimalData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public void setTimestamp(int pos, TimestampData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "AuronColumnarRowData do not support equals, please compare
fields one by one!");
+ }
+
+ public int hashCode() {
+ throw new UnsupportedOperationException(
+ "AuronColumnarRowData do not support hashCode, please hash
fields one by one!");
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/utils/SchemaConverters.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/utils/SchemaConverters.java
new file mode 100644
index 00000000..d18216da
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/utils/SchemaConverters.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.utils;
+
+import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+
+import java.util.stream.Collectors;
+import org.apache.auron.protobuf.*;
+import org.apache.flink.table.types.logical.*;
+
+/**
+ * Converts Flink's {@link RowType} to Auron's {@link Schema}.
+ */
+public class SchemaConverters {
+
+ /**
+ * Converts a {@link RowType} to a {@link Schema}.
+ */
+ public static Schema convertToAuronSchema(RowType rowType, boolean
withKafkaMeta) {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ if (withKafkaMeta) {
+ schemaBuilder.addColumns(Field.newBuilder()
+ .setName(KAFKA_AURON_META_PARTITION_ID)
+ .setNullable(false)
+ .setArrowType(ArrowType.newBuilder()
+ .setINT32(EmptyMessage.getDefaultInstance())
+ .build()));
+ schemaBuilder.addColumns(Field.newBuilder()
+ .setName(KAFKA_AURON_META_OFFSET)
+ .setNullable(false)
+ .setArrowType(ArrowType.newBuilder()
+ .setINT64(EmptyMessage.getDefaultInstance())
+ .build()));
+ schemaBuilder.addColumns(Field.newBuilder()
+ .setName(KAFKA_AURON_META_TIMESTAMP)
+ .setNullable(false)
+ .setArrowType(ArrowType.newBuilder()
+ .setINT64(EmptyMessage.getDefaultInstance())
+ .build()));
+ }
+ for (int i = 0; i < rowType.getFields().size(); i++) {
+ RowType.RowField rowField = rowType.getFields().get(i);
+ if
(rowField.getName().equalsIgnoreCase(FLINK_SQL_PROC_TIME_KEY_WORD)) {
+ // proc time is nullable
+ schemaBuilder.addColumns(convertField(rowField, true));
+ } else {
+ schemaBuilder.addColumns(convertField(rowField, false));
+ }
+ }
+ return schemaBuilder.build();
+ }
+
+ public static Field convertField(RowType.RowField rowField, boolean
isProctime) {
+ return Field.newBuilder()
+ .setName(rowField.getName())
+ .setNullable(isProctime ? true :
rowField.getType().isNullable())
+ .setArrowType(convertToAuronArrowType(rowField.getType()))
+ .build();
+ }
+
+ public static ArrowType convertToAuronArrowType(LogicalType
flinkLogicalType) {
+ ArrowType.Builder arrowTypeBuilder = ArrowType.newBuilder();
+ switch (flinkLogicalType.getTypeRoot()) {
+ case NULL:
+ arrowTypeBuilder.setNONE(EmptyMessage.getDefaultInstance());
+ break;
+ case BOOLEAN:
+ arrowTypeBuilder.setBOOL(EmptyMessage.getDefaultInstance());
+ break;
+ case TINYINT:
+ arrowTypeBuilder.setINT8(EmptyMessage.getDefaultInstance());
+ break;
+ case SMALLINT:
+ arrowTypeBuilder.setINT16(EmptyMessage.getDefaultInstance());
+ break;
+ case INTEGER:
+ arrowTypeBuilder.setINT32(EmptyMessage.getDefaultInstance());
+ break;
+ case BIGINT:
+ arrowTypeBuilder.setINT64(EmptyMessage.getDefaultInstance());
+ break;
+ case FLOAT:
+ arrowTypeBuilder.setFLOAT32(EmptyMessage.getDefaultInstance());
+ break;
+ case DOUBLE:
+ arrowTypeBuilder.setFLOAT64(EmptyMessage.getDefaultInstance());
+ break;
+ case CHAR:
+ case VARCHAR:
+ arrowTypeBuilder.setUTF8(EmptyMessage.getDefaultInstance());
+ break;
+ case BINARY:
+ case VARBINARY:
+ arrowTypeBuilder.setBINARY(EmptyMessage.getDefaultInstance());
+ break;
+ case DATE:
+ arrowTypeBuilder.setDATE32(EmptyMessage.getDefaultInstance());
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // timezone is never used in native side
+
arrowTypeBuilder.setTIMESTAMP(Timestamp.newBuilder().setTimeUnit(TimeUnit.Millisecond));
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ // timezone is never used in native side
+
arrowTypeBuilder.setTIMESTAMP(Timestamp.newBuilder().setTimeUnit(TimeUnit.Millisecond));
+ break;
+ case DECIMAL:
+ // decimal
+ DecimalType t = (DecimalType) flinkLogicalType;
+ arrowTypeBuilder.setDECIMAL(Decimal.newBuilder()
+ .setWhole(Math.max(t.getPrecision(), 1))
+ .setFractional(t.getScale())
+ .build());
+ break;
+ case ARRAY:
+ // array/list
+ ArrayType a = (ArrayType) flinkLogicalType;
+ arrowTypeBuilder.setLIST(List.newBuilder()
+ .setFieldType(Field.newBuilder()
+ .setName("item")
+
.setArrowType(convertToAuronArrowType(a.getElementType()))
+ .setNullable(a.isNullable()))
+ .build());
+ break;
+ case MAP:
+ MapType m = (MapType) flinkLogicalType;
+ arrowTypeBuilder.setMAP(Map.newBuilder()
+ .setKeyType(Field.newBuilder()
+ .setName("key")
+
.setArrowType(convertToAuronArrowType(m.getKeyType()))
+ .setNullable(false))
+ .setValueType(Field.newBuilder()
+ .setName("value")
+
.setArrowType(convertToAuronArrowType(m.getValueType()))
+ .setNullable(m.getValueType().isNullable()))
+ .build());
+ break;
+ case ROW:
+ // StructType
+ RowType r = (RowType) flinkLogicalType;
+ arrowTypeBuilder.setSTRUCT(Struct.newBuilder()
+ .addAllSubFieldTypes(r.getFields().stream()
+ .map(e -> Field.newBuilder()
+
.setArrowType(convertToAuronArrowType(e.getType()))
+ .setName(e.getName())
+ .setNullable(e.getType().isNullable())
+ .build())
+ .collect(Collectors.toList()))
+ .build());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Data type conversion not implemented " +
flinkLogicalType.asSummaryString());
+ }
+ return arrowTypeBuilder.build();
+ }
+}