This is an automated email from the ASF dual-hosted git repository.

qingwzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 53766ddd [ISSUE-583] feat: add paimon source connector (#584)
53766ddd is described below

commit 53766ddda61683c6501e7c7068f1daf91cfba370
Author: Qingwen Zhao <[email protected]>
AuthorDate: Tue Aug 12 16:41:48 2025 +0800

    [ISSUE-583] feat: add paimon source connector (#584)
    
    * feat: add paimon source connector
---
 .../geaflow-dsl-connector-paimon/pom.xml           |  44 +++
 .../dsl/connector/paimon/PaimonConfigKeys.java     |  52 ++++
 .../connector/paimon/PaimonRecordDeserializer.java |  85 ++++++
 .../dsl/connector/paimon/PaimonTableConnector.java |  39 +++
 .../dsl/connector/paimon/PaimonTableSource.java    | 312 +++++++++++++++++++++
 ...apache.geaflow.dsl.connector.api.TableConnector |  19 ++
 .../connector/paimon/PaimonTableConnectorTest.java | 168 +++++++++++
 geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml  |   1 +
 8 files changed, 720 insertions(+)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml
new file mode 100644
index 00000000..987592ce
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.geaflow</groupId>
+        <artifactId>geaflow-dsl-connector</artifactId>
+        <version>0.6.8-SNAPSHOT</version>
+    </parent>
+    <artifactId>geaflow-dsl-connector-paimon</artifactId>
+    <name>geaflow-dsl-connector-paimon</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geaflow</groupId>
+            <artifactId>geaflow-dsl-connector-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
new file mode 100644
index 00000000..106ef026
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class PaimonConfigKeys {
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_WAREHOUSE = ConfigKeys
+        .key("geaflow.dsl.paimon.warehouse")
+        .noDefaultValue()
+        .description("The warehouse path for paimon catalog creation.");
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_OPTIONS_JSON = ConfigKeys
+        .key("geaflow.dsl.paimon.options.json")
+        .noDefaultValue()
+        .description("The options json for paimon catalog creation.");
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON = 
ConfigKeys
+        .key("geaflow.dsl.paimon.configuration.json")
+        .noDefaultValue()
+        .description("The configuration json for paimon catalog creation.");
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_DATABASE_NAME = ConfigKeys
+        .key("geaflow.dsl.paimon.database.name")
+        .noDefaultValue()
+        .description("The database name for paimon table.");
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_TABLE_NAME = ConfigKeys
+        .key("geaflow.dsl.paimon.table.name")
+        .noDefaultValue()
+        .description("The paimon table name to read.");
+
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
new file mode 100644
index 00000000..d38c360d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.paimon.data.InternalRow;
+
+public class PaimonRecordDeserializer implements TableDeserializer<Object> {
+
+    private StructType schema;
+    private TableSchema tableSchema;
+
+    @Override
+    public void init(Configuration conf, StructType schema) {
+        this.tableSchema = (TableSchema) schema;
+        this.schema = this.tableSchema.getDataSchema();
+    }
+
+    @Override
+    public List<Row> deserialize(Object record) {
+        InternalRow internalRow = (InternalRow) record;
+        assert internalRow.getFieldCount() == schema.size();
+        Object[] values = new Object[schema.size()];
+        for (int i = 0; i < schema.size(); i++) {
+            TableField field = this.schema.getField(i);
+            switch (field.getType().getName()) {
+                case Types.TYPE_NAME_BOOLEAN:
+                    values[i] = internalRow.getBoolean(i);
+                    break;
+                case Types.TYPE_NAME_BYTE:
+                    values[i] = internalRow.getByte(i);
+                    break;
+                case Types.TYPE_NAME_DOUBLE:
+                    values[i] = internalRow.getDouble(i);
+                    break;
+                case Types.TYPE_NAME_FLOAT:
+                    values[i] = internalRow.getFloat(i);
+                    break;
+                case Types.TYPE_NAME_INTEGER:
+                    values[i] = internalRow.getInt(i);
+                    break;
+                case Types.TYPE_NAME_LONG:
+                    values[i] = internalRow.getLong(i);
+                    break;
+                case Types.TYPE_NAME_STRING:
+                    values[i] = internalRow.getString(i);
+                    break;
+                case Types.TYPE_NAME_BINARY_STRING:
+                    values[i] = internalRow.getString(i);
+                    break;
+                default:
+                    throw new GeaFlowDSLException("Type: {} not support",
+                        field.getType().getName());
+            }
+        }
+        return Collections.singletonList(ObjectRow.create(values));
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java
new file mode 100644
index 00000000..deda9383
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+
+public class PaimonTableConnector implements TableReadableConnector {
+
+    private static final String PAIMON = "PAIMON";
+
+    @Override
+    public String getType() {
+        return PAIMON;
+    }
+
+    @Override
+    public TableSource createSource(Configuration conf) {
+        return new PaimonTableSource();
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
new file mode 100644
index 00000000..b980db11
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.DSLConfigKeys;
+import org.apache.geaflow.common.utils.GsonUtil;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.common.util.Windows;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.FetchWindow;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonTableSource implements TableSource {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonTableSource.class);
+
+    private Configuration tableConf;
+    private TableSchema tableSchema;
+    private boolean isAllWindow;
+
+    private String path;
+    private Map<String, String> options;
+    private String configJson;
+    private Map<String, String> configs;
+    private String database;
+    private String table;
+
+    private transient CatalogContext catalogContext;
+    private transient Catalog catalog;
+    private transient ReadBuilder readBuilder;
+    private transient Map<PaimonPartition, RecordReader<InternalRow>> 
partition2Reader;
+    private transient Map<PaimonPartition, PaimonOffset> partition2InnerOffset;
+
+    @Override
+    public void init(Configuration tableConf, TableSchema tableSchema) {
+        this.tableConf = tableConf;
+        this.isAllWindow = 
tableConf.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE) == 
Windows.SIZE_OF_ALL_WINDOW;
+        this.tableSchema = tableSchema;
+        this.path = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE, "");
+        this.options = new HashMap<>();
+        this.configs = new HashMap<>();
+        if (StringUtils.isBlank(this.path)) {
+            String optionJson = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_OPTIONS_JSON);
+            Map<String, String> userOptions = GsonUtil.parse(optionJson);
+            if (userOptions != null) {
+                for (Map.Entry<String, String> entry : userOptions.entrySet()) 
{
+                    options.put(entry.getKey(), entry.getValue());
+                }
+            }
+            this.configJson =
+                
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON, "");
+            if (!StringUtils.isBlank(configJson)) {
+                Map<String, String> userConfig = GsonUtil.parse(configJson);
+                if (userConfig != null) {
+                    for (Map.Entry<String, String> entry : 
userConfig.entrySet()) {
+                        configs.put(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+        }
+        this.database = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME);
+        this.table = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME);
+    }
+
+    @Override
+    public List<Partition> listPartitions(int parallelism) {
+        return listPartitions();
+    }
+
+    @Override
+    public void open(RuntimeContext context) {
+        if (StringUtils.isBlank(this.path)) {
+            if (StringUtils.isBlank(this.configJson)) {
+                this.catalogContext =
+                    Objects.requireNonNull(CatalogContext.create(new 
Options(options)));
+            } else {
+                org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+                for (Map.Entry<String, String> entry : configs.entrySet()) {
+                    hadoopConf.set(entry.getKey(), entry.getValue());
+                }
+                this.catalogContext =
+                    Objects.requireNonNull(CatalogContext.create(new 
Options(options), hadoopConf));
+            }
+        } else {
+            this.catalogContext = 
Objects.requireNonNull(CatalogContext.create(new Path(path)));
+        }
+        this.catalog = 
Objects.requireNonNull(CatalogFactory.createCatalog(this.catalogContext));
+        Identifier identifier = Identifier.create(database, table);
+        try {
+            this.readBuilder = 
Objects.requireNonNull(catalog.getTable(identifier).newReadBuilder());
+        } catch (TableNotExistException e) {
+            throw new GeaFlowDSLException("Table: {} in db: {} not exists.", 
table, database);
+        }
+        this.partition2Reader = new HashMap<>();
+        this.partition2InnerOffset = new HashMap<>();
+        LOGGER.info("Open paimon source, tableConf: {}, tableSchema: {}, path: 
{}, options: "
+            + "{}, configs: {}, database: {}, tableName: {}", tableConf, 
tableSchema, path,
+            options, configs, database, table);
+    }
+
+    @Override
+    public List<Partition> listPartitions() {
+        List<Split> splits = isAllWindow ? 
readBuilder.newScan().plan().splits() :
+                             readBuilder.newStreamScan().plan().splits();
+        return splits.stream().map(split -> new PaimonPartition(database, 
table, split)).collect(Collectors.toList());
+    }
+
+    @Override
+    public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
+        return (TableDeserializer<IN>) new PaimonRecordDeserializer();
+    }
+
+    @Override
+    public <T> FetchData fetch(Partition partition, Optional<Offset> 
startOffset, FetchWindow windowInfo)
+        throws IOException {
+        PaimonPartition paimonPartition = (PaimonPartition) partition;
+        assert paimonPartition.getDatabase().equals(this.database)
+            && paimonPartition.getTable().equals(this.table);
+        RecordReader reader = partition2Reader.getOrDefault(partition,
+            readBuilder.newRead().createReader(paimonPartition.getSplit()));
+        partition2Reader.put(paimonPartition, reader);
+
+        PaimonOffset innerOffset = 
partition2InnerOffset.getOrDefault(partition,
+            new PaimonOffset());
+        partition2InnerOffset.put(paimonPartition, innerOffset);
+
+        if (startOffset.isPresent() && !startOffset.get().equals(innerOffset)) 
{
+            throw new GeaFlowDSLException("Paimon connector not support reset 
offset.");
+        }
+        CloseableIterator iterator = reader.toCloseableIterator();
+        switch (windowInfo.getType()) {
+            case ALL_WINDOW:
+                return FetchData.createBatchFetch(iterator, new 
PaimonOffset());
+            case SIZE_TUMBLING_WINDOW:
+                List<Object> readContents = new ArrayList<>();
+                long i = 0;
+                for (; i < windowInfo.windowSize(); i++) {
+                    if (iterator.hasNext()) {
+                        readContents.add(iterator.next());
+                    } else {
+                        break;
+                    }
+                }
+                long nextOffset = innerOffset.getOffset() + i;
+                boolean isFinished = !iterator.hasNext();
+                return FetchData.createStreamFetch(readContents, new 
PaimonOffset(nextOffset), isFinished);
+            default:
+                throw new GeaFlowDSLException("Paimon not support window:{}", 
windowInfo.getType());
+        }
+    }
+
+    @Override
+    public void close() {
+        for (RecordReader reader : partition2Reader.values()) {
+            if (reader != null) {
+                try {
+                    reader.close();
+                } catch (IOException e) {
+                    throw new GeaFlowDSLException("Error occurs when close 
paimon reader.", e);
+                }
+            }
+        }
+        partition2Reader.clear();
+        partition2InnerOffset.clear();
+    }
+
+    public static class PaimonPartition implements Partition {
+
+        private final String database;
+        private final String table;
+        private final Split split;
+
+        public PaimonPartition(String database, String table, Split split) {
+            this.database = Objects.requireNonNull(database);
+            this.table = Objects.requireNonNull(table);
+            this.split = Objects.requireNonNull(split);
+        }
+
+        @Override
+        public String getName() {
+            return database + "-" + table;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(database, table, split);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof PaimonPartition)) {
+                return false;
+            }
+            PaimonPartition that = (PaimonPartition) o;
+            return Objects.equals(database, that.database) && Objects.equals(
+                table, that.table) && Objects.equals(
+                split, that.split);
+        }
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public Split getSplit() {
+            return split;
+        }
+
+        @Override
+        public void setIndex(int index, int parallel) {
+        }
+    }
+
+
+    public static class PaimonOffset implements Offset {
+
+        private final long offset;
+
+
+        public PaimonOffset() {
+            this.offset = 0L;
+        }
+
+        public PaimonOffset(long offset) {
+            this.offset = offset;
+        }
+
+        @Override
+        public String humanReadable() {
+            return String.valueOf(offset);
+        }
+
+        @Override
+        public long getOffset() {
+            return offset;
+        }
+
+        @Override
+        public boolean isTimestamp() {
+            return false;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PaimonOffset that = (PaimonOffset) o;
+            return offset == that.offset;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(offset);
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 00000000..5b7cbce0
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.geaflow.dsl.connector.paimon.PaimonTableConnector
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
new file mode 100644
index 00000000..84746b46
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.ConnectorConfigKeys;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableConnector;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.util.ConnectorFactory;
+import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow;
+import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.VarCharType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+public class PaimonTableConnectorTest {
+
+    private final StructType dataSchema = new StructType(
+        new TableField("id", Types.INTEGER, false),
+        new TableField("name", Types.BINARY_STRING),
+        new TableField("price", Types.DOUBLE)
+    );
+
+    private final StructType partitionSchema = new StructType(
+        new TableField("dt", Types.BINARY_STRING, false)
+    );
+
+    private final TableSchema tableSchema = new TableSchema(dataSchema, 
partitionSchema);
+
+
+    @BeforeTest
+    public void prepare() {
+        String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
+        FileUtils.deleteQuietly(new File(tmpDir));
+        String db = "paimon_db";
+        String tableName = "paimon_table";
+        CatalogContext catalogContext =
+            Objects.requireNonNull(CatalogContext.create(new Path(tmpDir)));
+        Catalog catalog = 
Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext));
+        try {
+            catalog.createDatabase(db, false);
+            List<String> dbs = catalog.listDatabases();
+            assert dbs.get(0).equals(db);
+            Identifier identifier = new Identifier(db, tableName);
+            catalog.createTable(identifier,
+                Schema.newBuilder()
+                    .column("id", new IntType())
+                    .column("name", new VarCharType(256))
+                    .column("price", new DoubleType())
+                    .build(), false);
+            List<String> tables = catalog.listTables(dbs.get(0));
+            assert tables.get(0).equals(tableName);
+            Table table = catalog.getTable(identifier);
+            BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+            BatchTableWrite write = writeBuilder.newWrite();
+            GenericRow record1 = GenericRow.of(1, 
BinaryString.fromString("a1"), 10.0);
+            GenericRow record2 = GenericRow.of(2, 
BinaryString.fromString("ab"), 12.0);
+            GenericRow record3 = GenericRow.of(3, 
BinaryString.fromString("a3"), 12.0);
+            GenericRow record4 = GenericRow.of(4, 
BinaryString.fromString("bcd"), 15.0);
+            GenericRow record5 = GenericRow.of(5, 
BinaryString.fromString("a5"), 10.0);
+            write.write(record1);
+            write.write(record2);
+            write.write(record3);
+            write.write(record4);
+            write.write(record5);
+            List<CommitMessage> messages = write.prepareCommit();
+            BatchTableCommit commit = writeBuilder.newCommit();
+            commit.commit(messages);
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Test error.", e);
+        }
+    }
+
+    @Test
+    public void testReadPaimon() throws IOException {
+        String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
+        String db = "paimon_db";
+        String table = "paimon_table";
+
+        TableConnector tableConnector = 
ConnectorFactory.loadConnector("PAIMON");
+        Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT), 
"paimon");
+        TableReadableConnector readableConnector = (TableReadableConnector) 
tableConnector;
+
+        Map<String, String> tableConfMap = new HashMap<>();
+        tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(), 
tmpDir);
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(), 
tmpDir);
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(), 
db);
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(), 
table);
+        Configuration tableConf = new Configuration(tableConfMap);
+        TableSource tableSource = readableConnector.createSource(tableConf);
+        tableSource.init(tableConf, tableSchema);
+
+        tableSource.open(new DefaultRuntimeContext(tableConf));
+
+        List<Partition> partitions = tableSource.listPartitions();
+
+        TableDeserializer deserializer = 
tableSource.getDeserializer(tableConf);
+        deserializer.init(tableConf, tableSchema);
+        List<Row> readRows = new ArrayList<>();
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new AllFetchWindow(-1L));
+            while (rows.getDataIterator().hasNext()) {
+                
readRows.addAll(deserializer.deserialize(rows.getDataIterator().next()));
+            }
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+            "[1, a1, 10.0]\n"
+                + "[2, ab, 12.0]\n"
+                + "[3, a3, 12.0]\n"
+                + "[4, bcd, 15.0]\n"
+                + "[5, a5, 10.0]");
+    }
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
index 9ef2186d..b7c9821a 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
@@ -46,6 +46,7 @@
         <module>geaflow-dsl-connector-hbase</module>
         <module>geaflow-dsl-connector-pulsar</module>
         <module>geaflow-dsl-connector-random</module>
+        <module>geaflow-dsl-connector-paimon</module>
     </modules>
 
     <dependencyManagement>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to