This is an automated email from the ASF dual-hosted git repository.
qingwzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 53766ddd [ISSUE-583] feat: add paimon source connector (#584)
53766ddd is described below
commit 53766ddda61683c6501e7c7068f1daf91cfba370
Author: Qingwen Zhao <[email protected]>
AuthorDate: Tue Aug 12 16:41:48 2025 +0800
[ISSUE-583] feat: add paimon source connector (#584)
* feat: add paimon source connector
---
.../geaflow-dsl-connector-paimon/pom.xml | 44 +++
.../dsl/connector/paimon/PaimonConfigKeys.java | 52 ++++
.../connector/paimon/PaimonRecordDeserializer.java | 85 ++++++
.../dsl/connector/paimon/PaimonTableConnector.java | 39 +++
.../dsl/connector/paimon/PaimonTableSource.java | 312 +++++++++++++++++++++
...apache.geaflow.dsl.connector.api.TableConnector | 19 ++
.../connector/paimon/PaimonTableConnectorTest.java | 168 +++++++++++
geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml | 1 +
8 files changed, 720 insertions(+)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml
new file mode 100644
index 00000000..987592ce
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector</artifactId>
+ <version>0.6.8-SNAPSHOT</version>
+ </parent>
+ <artifactId>geaflow-dsl-connector-paimon</artifactId>
+ <name>geaflow-dsl-connector-paimon</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
new file mode 100644
index 00000000..106ef026
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class PaimonConfigKeys {
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_WAREHOUSE = ConfigKeys
+ .key("geaflow.dsl.paimon.warehouse")
+ .noDefaultValue()
+ .description("The warehouse path for paimon catalog creation.");
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_OPTIONS_JSON = ConfigKeys
+ .key("geaflow.dsl.paimon.options.json")
+ .noDefaultValue()
+ .description("The options json for paimon catalog creation.");
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON =
ConfigKeys
+ .key("geaflow.dsl.paimon.configuration.json")
+ .noDefaultValue()
+ .description("The configuration json for paimon catalog creation.");
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_DATABASE_NAME = ConfigKeys
+ .key("geaflow.dsl.paimon.database.name")
+ .noDefaultValue()
+ .description("The database name for paimon table.");
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_TABLE_NAME = ConfigKeys
+ .key("geaflow.dsl.paimon.table.name")
+ .noDefaultValue()
+ .description("The paimon table name to read.");
+
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
new file mode 100644
index 00000000..d38c360d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.paimon.data.InternalRow;
+
+public class PaimonRecordDeserializer implements TableDeserializer<Object> {
+
+ private StructType schema;
+ private TableSchema tableSchema;
+
+ @Override
+ public void init(Configuration conf, StructType schema) {
+ this.tableSchema = (TableSchema) schema;
+ this.schema = this.tableSchema.getDataSchema();
+ }
+
+ @Override
+ public List<Row> deserialize(Object record) {
+ InternalRow internalRow = (InternalRow) record;
+ assert internalRow.getFieldCount() == schema.size();
+ Object[] values = new Object[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ TableField field = this.schema.getField(i);
+ switch (field.getType().getName()) {
+ case Types.TYPE_NAME_BOOLEAN:
+ values[i] = internalRow.getBoolean(i);
+ break;
+ case Types.TYPE_NAME_BYTE:
+ values[i] = internalRow.getByte(i);
+ break;
+ case Types.TYPE_NAME_DOUBLE:
+ values[i] = internalRow.getDouble(i);
+ break;
+ case Types.TYPE_NAME_FLOAT:
+ values[i] = internalRow.getFloat(i);
+ break;
+ case Types.TYPE_NAME_INTEGER:
+ values[i] = internalRow.getInt(i);
+ break;
+ case Types.TYPE_NAME_LONG:
+ values[i] = internalRow.getLong(i);
+ break;
+ case Types.TYPE_NAME_STRING:
+ values[i] = internalRow.getString(i);
+ break;
+ case Types.TYPE_NAME_BINARY_STRING:
+ values[i] = internalRow.getString(i);
+ break;
+ default:
+ throw new GeaFlowDSLException("Type: {} not support",
+ field.getType().getName());
+ }
+ }
+ return Collections.singletonList(ObjectRow.create(values));
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java
new file mode 100644
index 00000000..deda9383
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+
+public class PaimonTableConnector implements TableReadableConnector {
+
+ private static final String PAIMON = "PAIMON";
+
+ @Override
+ public String getType() {
+ return PAIMON;
+ }
+
+ @Override
+ public TableSource createSource(Configuration conf) {
+ return new PaimonTableSource();
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
new file mode 100644
index 00000000..b980db11
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.DSLConfigKeys;
+import org.apache.geaflow.common.utils.GsonUtil;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.common.util.Windows;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.FetchWindow;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonTableSource implements TableSource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonTableSource.class);
+
+ private Configuration tableConf;
+ private TableSchema tableSchema;
+ private boolean isAllWindow;
+
+ private String path;
+ private Map<String, String> options;
+ private String configJson;
+ private Map<String, String> configs;
+ private String database;
+ private String table;
+
+ private transient CatalogContext catalogContext;
+ private transient Catalog catalog;
+ private transient ReadBuilder readBuilder;
+ private transient Map<PaimonPartition, RecordReader<InternalRow>>
partition2Reader;
+ private transient Map<PaimonPartition, PaimonOffset> partition2InnerOffset;
+
+ @Override
+ public void init(Configuration tableConf, TableSchema tableSchema) {
+ this.tableConf = tableConf;
+ this.isAllWindow =
tableConf.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE) ==
Windows.SIZE_OF_ALL_WINDOW;
+ this.tableSchema = tableSchema;
+ this.path =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE, "");
+ this.options = new HashMap<>();
+ this.configs = new HashMap<>();
+ if (StringUtils.isBlank(this.path)) {
+ String optionJson =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_OPTIONS_JSON);
+ Map<String, String> userOptions = GsonUtil.parse(optionJson);
+ if (userOptions != null) {
+ for (Map.Entry<String, String> entry : userOptions.entrySet())
{
+ options.put(entry.getKey(), entry.getValue());
+ }
+ }
+ this.configJson =
+
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON, "");
+ if (!StringUtils.isBlank(configJson)) {
+ Map<String, String> userConfig = GsonUtil.parse(configJson);
+ if (userConfig != null) {
+ for (Map.Entry<String, String> entry :
userConfig.entrySet()) {
+ configs.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ this.database =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME);
+ this.table =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME);
+ }
+
+ @Override
+ public List<Partition> listPartitions(int parallelism) {
+ return listPartitions();
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ if (StringUtils.isBlank(this.path)) {
+ if (StringUtils.isBlank(this.configJson)) {
+ this.catalogContext =
+ Objects.requireNonNull(CatalogContext.create(new
Options(options)));
+ } else {
+ org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
+ for (Map.Entry<String, String> entry : configs.entrySet()) {
+ hadoopConf.set(entry.getKey(), entry.getValue());
+ }
+ this.catalogContext =
+ Objects.requireNonNull(CatalogContext.create(new
Options(options), hadoopConf));
+ }
+ } else {
+ this.catalogContext =
Objects.requireNonNull(CatalogContext.create(new Path(path)));
+ }
+ this.catalog =
Objects.requireNonNull(CatalogFactory.createCatalog(this.catalogContext));
+ Identifier identifier = Identifier.create(database, table);
+ try {
+ this.readBuilder =
Objects.requireNonNull(catalog.getTable(identifier).newReadBuilder());
+ } catch (TableNotExistException e) {
+ throw new GeaFlowDSLException("Table: {} in db: {} not exists.",
table, database);
+ }
+ this.partition2Reader = new HashMap<>();
+ this.partition2InnerOffset = new HashMap<>();
+ LOGGER.info("Open paimon source, tableConf: {}, tableSchema: {}, path:
{}, options: "
+ + "{}, configs: {}, database: {}, tableName: {}", tableConf,
tableSchema, path,
+ options, configs, database, table);
+ }
+
+ @Override
+ public List<Partition> listPartitions() {
+ List<Split> splits = isAllWindow ?
readBuilder.newScan().plan().splits() :
+ readBuilder.newStreamScan().plan().splits();
+ return splits.stream().map(split -> new PaimonPartition(database,
table, split)).collect(Collectors.toList());
+ }
+
+ @Override
+ public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
+ return (TableDeserializer<IN>) new PaimonRecordDeserializer();
+ }
+
+ @Override
+ public <T> FetchData fetch(Partition partition, Optional<Offset>
startOffset, FetchWindow windowInfo)
+ throws IOException {
+ PaimonPartition paimonPartition = (PaimonPartition) partition;
+ assert paimonPartition.getDatabase().equals(this.database)
+ && paimonPartition.getTable().equals(this.table);
+ RecordReader reader = partition2Reader.getOrDefault(partition,
+ readBuilder.newRead().createReader(paimonPartition.getSplit()));
+ partition2Reader.put(paimonPartition, reader);
+
+ PaimonOffset innerOffset =
partition2InnerOffset.getOrDefault(partition,
+ new PaimonOffset());
+ partition2InnerOffset.put(paimonPartition, innerOffset);
+
+ if (startOffset.isPresent() && !startOffset.get().equals(innerOffset))
{
+ throw new GeaFlowDSLException("Paimon connector not support reset
offset.");
+ }
+ CloseableIterator iterator = reader.toCloseableIterator();
+ switch (windowInfo.getType()) {
+ case ALL_WINDOW:
+ return FetchData.createBatchFetch(iterator, new
PaimonOffset());
+ case SIZE_TUMBLING_WINDOW:
+ List<Object> readContents = new ArrayList<>();
+ long i = 0;
+ for (; i < windowInfo.windowSize(); i++) {
+ if (iterator.hasNext()) {
+ readContents.add(iterator.next());
+ } else {
+ break;
+ }
+ }
+ long nextOffset = innerOffset.getOffset() + i;
+ boolean isFinished = !iterator.hasNext();
+ return FetchData.createStreamFetch(readContents, new
PaimonOffset(nextOffset), isFinished);
+ default:
+ throw new GeaFlowDSLException("Paimon not support window:{}",
windowInfo.getType());
+ }
+ }
+
+ @Override
+ public void close() {
+ for (RecordReader reader : partition2Reader.values()) {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new GeaFlowDSLException("Error occurs when close
paimon reader.", e);
+ }
+ }
+ }
+ partition2Reader.clear();
+ partition2InnerOffset.clear();
+ }
+
+ public static class PaimonPartition implements Partition {
+
+ private final String database;
+ private final String table;
+ private final Split split;
+
+ public PaimonPartition(String database, String table, Split split) {
+ this.database = Objects.requireNonNull(database);
+ this.table = Objects.requireNonNull(table);
+ this.split = Objects.requireNonNull(split);
+ }
+
+ @Override
+ public String getName() {
+ return database + "-" + table;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(database, table, split);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PaimonPartition)) {
+ return false;
+ }
+ PaimonPartition that = (PaimonPartition) o;
+ return Objects.equals(database, that.database) && Objects.equals(
+ table, that.table) && Objects.equals(
+ split, that.split);
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public Split getSplit() {
+ return split;
+ }
+
+ @Override
+ public void setIndex(int index, int parallel) {
+ }
+ }
+
+
+ public static class PaimonOffset implements Offset {
+
+ private final long offset;
+
+
+ public PaimonOffset() {
+ this.offset = 0L;
+ }
+
+ public PaimonOffset(long offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public String humanReadable() {
+ return String.valueOf(offset);
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public boolean isTimestamp() {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonOffset that = (PaimonOffset) o;
+ return offset == that.offset;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(offset);
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 00000000..5b7cbce0
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.geaflow.dsl.connector.paimon.PaimonTableConnector
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
new file mode 100644
index 00000000..84746b46
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.ConnectorConfigKeys;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableConnector;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.util.ConnectorFactory;
+import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow;
+import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.VarCharType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+public class PaimonTableConnectorTest {
+
+ private final StructType dataSchema = new StructType(
+ new TableField("id", Types.INTEGER, false),
+ new TableField("name", Types.BINARY_STRING),
+ new TableField("price", Types.DOUBLE)
+ );
+
+ private final StructType partitionSchema = new StructType(
+ new TableField("dt", Types.BINARY_STRING, false)
+ );
+
+ private final TableSchema tableSchema = new TableSchema(dataSchema,
partitionSchema);
+
+
+ @BeforeTest
+ public void prepare() {
+ String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
+ FileUtils.deleteQuietly(new File(tmpDir));
+ String db = "paimon_db";
+ String tableName = "paimon_table";
+ CatalogContext catalogContext =
+ Objects.requireNonNull(CatalogContext.create(new Path(tmpDir)));
+ Catalog catalog =
Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext));
+ try {
+ catalog.createDatabase(db, false);
+ List<String> dbs = catalog.listDatabases();
+ assert dbs.get(0).equals(db);
+ Identifier identifier = new Identifier(db, tableName);
+ catalog.createTable(identifier,
+ Schema.newBuilder()
+ .column("id", new IntType())
+ .column("name", new VarCharType(256))
+ .column("price", new DoubleType())
+ .build(), false);
+ List<String> tables = catalog.listTables(dbs.get(0));
+ assert tables.get(0).equals(tableName);
+ Table table = catalog.getTable(identifier);
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ GenericRow record1 = GenericRow.of(1,
BinaryString.fromString("a1"), 10.0);
+ GenericRow record2 = GenericRow.of(2,
BinaryString.fromString("ab"), 12.0);
+ GenericRow record3 = GenericRow.of(3,
BinaryString.fromString("a3"), 12.0);
+ GenericRow record4 = GenericRow.of(4,
BinaryString.fromString("bcd"), 15.0);
+ GenericRow record5 = GenericRow.of(5,
BinaryString.fromString("a5"), 10.0);
+ write.write(record1);
+ write.write(record2);
+ write.write(record3);
+ write.write(record4);
+ write.write(record5);
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Test error.", e);
+ }
+ }
+
+ @Test
+ public void testReadPaimon() throws IOException {
+ String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
+ String db = "paimon_db";
+ String table = "paimon_table";
+
+ TableConnector tableConnector =
ConnectorFactory.loadConnector("PAIMON");
+ Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT),
"paimon");
+ TableReadableConnector readableConnector = (TableReadableConnector)
tableConnector;
+
+ Map<String, String> tableConfMap = new HashMap<>();
+ tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(),
tmpDir);
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(),
tmpDir);
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(),
db);
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(),
table);
+ Configuration tableConf = new Configuration(tableConfMap);
+ TableSource tableSource = readableConnector.createSource(tableConf);
+ tableSource.init(tableConf, tableSchema);
+
+ tableSource.open(new DefaultRuntimeContext(tableConf));
+
+ List<Partition> partitions = tableSource.listPartitions();
+
+ TableDeserializer deserializer =
tableSource.getDeserializer(tableConf);
+ deserializer.init(tableConf, tableSchema);
+ List<Row> readRows = new ArrayList<>();
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new AllFetchWindow(-1L));
+ while (rows.getDataIterator().hasNext()) {
+
readRows.addAll(deserializer.deserialize(rows.getDataIterator().next()));
+ }
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[1, a1, 10.0]\n"
+ + "[2, ab, 12.0]\n"
+ + "[3, a3, 12.0]\n"
+ + "[4, bcd, 15.0]\n"
+ + "[5, a5, 10.0]");
+ }
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
index 9ef2186d..b7c9821a 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
@@ -46,6 +46,7 @@
<module>geaflow-dsl-connector-hbase</module>
<module>geaflow-dsl-connector-pulsar</module>
<module>geaflow-dsl-connector-random</module>
+ <module>geaflow-dsl-connector-paimon</module>
</modules>
<dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]