This is an automated email from the ASF dual-hosted git repository.

loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 13618a5c3 feat: add neo4j and es connector (#653)
13618a5c3 is described below

commit 13618a5c30ad6be1ca0cea003203d46d3791aa20
Author: shown <[email protected]>
AuthorDate: Tue Dec 2 19:51:52 2025 +0800

    feat: add neo4j and es connector (#653)
    
    * t# This is a combination of 2 commits.
    
    feat: add neo4j connector
    
    Signed-off-by: jishiwen.jsw <[email protected]>
    
    * fix: adapter jdk 8
    
    Signed-off-by: jishiwen.jsw <[email protected]>
    
    * feat: update by comment
    
    Signed-off-by: jishiwen.jsw <[email protected]>
    
    * fix: fix checkstyle errror
    
    Signed-off-by: jishiwen.jsw <[email protected]>
    
    * fix
    
    Signed-off-by: jishiwen.jsw <[email protected]>
    
    ---------
    
    Signed-off-by: jishiwen.jsw <[email protected]>
---
 .../geaflow-dsl-connector-elasticsearch/pom.xml    |  61 +++++
 .../elasticsearch/ElasticsearchConfigKeys.java     |  71 +++++
 .../elasticsearch/ElasticsearchConstants.java      |  44 +++
 .../elasticsearch/ElasticsearchTableConnector.java |  46 ++++
 .../elasticsearch/ElasticsearchTableSink.java      | 214 +++++++++++++++
 .../elasticsearch/ElasticsearchTableSource.java    | 270 ++++++++++++++++++
 ...apache.geaflow.dsl.connector.api.TableConnector |  19 ++
 .../elasticsearch/ElasticsearchConfigKeysTest.java |  67 +++++
 .../ElasticsearchTableConnectorTest.java           |  92 +++++++
 .../elasticsearch/ElasticsearchTableSinkTest.java  | 100 +++++++
 .../ElasticsearchTableSourceTest.java              |  98 +++++++
 .../geaflow-dsl-connector-neo4j/pom.xml            |  62 +++++
 .../dsl/connector/neo4j/Neo4jConfigKeys.java       | 109 ++++++++
 .../dsl/connector/neo4j/Neo4jConstants.java        |  39 +++
 .../dsl/connector/neo4j/Neo4jTableConnector.java   |  46 ++++
 .../dsl/connector/neo4j/Neo4jTableSink.java        | 301 +++++++++++++++++++++
 .../dsl/connector/neo4j/Neo4jTableSource.java      | 300 ++++++++++++++++++++
 ...apache.geaflow.dsl.connector.api.TableConnector |  20 ++
 .../dsl/connector/neo4j/Neo4jConfigKeysTest.java   |  79 ++++++
 .../connector/neo4j/Neo4jTableConnectorTest.java   |  81 ++++++
 geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml  |   2 +
 21 files changed, 2121 insertions(+)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
new file mode 100644
index 000000000..8cf12e3bd
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements.  See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership.  The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License.  You may obtain a copy of the License at
+ ~
+ ~   http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied.  See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.geaflow</groupId>
+        <artifactId>geaflow-dsl-connector</artifactId>
+        <version>0.6.8-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>geaflow-dsl-connector-elasticsearch</artifactId>
+    <name>geaflow-dsl-connector-elasticsearch</name>
+
+    <properties>
+        <elasticsearch.version>7.17.10</elasticsearch.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.geaflow</groupId>
+            <artifactId>geaflow-dsl-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geaflow</groupId>
+            <artifactId>geaflow-dsl-connector-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>${testng.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeys.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeys.java
new file mode 100644
index 000000000..65df4b4cd
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeys.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class ElasticsearchConfigKeys {
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_HOSTS = ConfigKeys
+            .key("geaflow.dsl.elasticsearch.hosts")
+            .noDefaultValue()
+            .description("Elasticsearch cluster hosts list.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_INDEX = ConfigKeys
+            .key("geaflow.dsl.elasticsearch.index")
+            .noDefaultValue()
+            .description("Elasticsearch index name.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD 
= ConfigKeys
+            .key("geaflow.dsl.elasticsearch.document.id.field")
+            .noDefaultValue()
+            .description("Elasticsearch document id field.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_USERNAME = 
ConfigKeys
+            .key("geaflow.dsl.elasticsearch.username")
+            .noDefaultValue()
+            .description("Elasticsearch username for authentication.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_PASSWORD = 
ConfigKeys
+            .key("geaflow.dsl.elasticsearch.password")
+            .noDefaultValue()
+            .description("Elasticsearch password for authentication.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE = 
ConfigKeys
+            .key("geaflow.dsl.elasticsearch.batch.size")
+            .defaultValue("1000")
+            .description("Elasticsearch batch write size.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT = 
ConfigKeys
+            .key("geaflow.dsl.elasticsearch.scroll.timeout")
+            .defaultValue("60s")
+            .description("Elasticsearch scroll query timeout.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT 
= ConfigKeys
+            .key("geaflow.dsl.elasticsearch.connection.timeout")
+            .defaultValue("1000")
+            .description("Elasticsearch connection timeout in milliseconds.");
+
+    public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT = 
ConfigKeys
+            .key("geaflow.dsl.elasticsearch.socket.timeout")
+            .defaultValue("30000")
+            .description("Elasticsearch socket timeout in milliseconds.");
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConstants.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConstants.java
new file mode 100644
index 000000000..ed8c7adac
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+public class ElasticsearchConstants {
+
+    public static final int DEFAULT_BATCH_SIZE = 1000;
+
+    public static final String DEFAULT_SCROLL_TIMEOUT = "60s";
+
+    public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
+
+    public static final int DEFAULT_SOCKET_TIMEOUT = 30000;
+
+    public static final int DEFAULT_SEARCH_SIZE = 1000;
+
+    public static final String ES_SCHEMA_SUFFIX = "://";
+
+    public static final String ES_HTTP_SCHEME = "http";
+
+    public static final String ES_HTTPS_SCHEME = "https";
+
+    public static final String ES_SPLIT_COMMA = ",";
+
+    public static final String ES_SPLIT_COLON = ";";
+
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnector.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnector.java
new file mode 100644
index 000000000..f8950a8d0
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.TableWritableConnector;
+
+public class ElasticsearchTableConnector implements TableReadableConnector, 
TableWritableConnector {
+
+    public static final String TYPE = "ELASTICSEARCH";
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    @Override
+    public TableSource createSource(Configuration conf) {
+        return new ElasticsearchTableSource();
+    }
+
+    @Override
+    public TableSink createSink(Configuration conf) {
+        return new ElasticsearchTableSink();
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSink.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSink.java
new file mode 100644
index 000000000..4831a6c21
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSink.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.xcontent.XContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchTableSink implements TableSink {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchTableSink.class);
+    private static final Gson GSON = new Gson();
+
+    private StructType schema;
+    private String hosts;
+    private String indexName;
+    private String documentIdField;
+    private String username;
+    private String password;
+    private int batchSize;
+    private int connectionTimeout;
+    private int socketTimeout;
+
+    private RestHighLevelClient client;
+    private BulkRequest bulkRequest;
+    private int batchCounter = 0;
+
+    @Override
+    public void init(Configuration conf, StructType schema) {
+        LOGGER.info("Prepare with config: {}, \n schema: {}", conf, schema);
+        this.schema = schema;
+
+        this.hosts = 
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+        this.indexName = 
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+        this.documentIdField = 
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD,
 "");
+        this.username = 
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, "");
+        this.password = 
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, "");
+        this.batchSize = 
conf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE,
+                ElasticsearchConstants.DEFAULT_BATCH_SIZE);
+        this.connectionTimeout = 
conf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT,
+                ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT);
+        this.socketTimeout = 
conf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT,
+                ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT);
+    }
+
+    @Override
+    public void open(RuntimeContext context) {
+        try {
+            this.client = createElasticsearchClient();
+            this.bulkRequest = new BulkRequest();
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to create Elasticsearch 
client", e);
+        }
+    }
+
+    @Override
+    public void write(Row row) throws IOException {
+        // Convert row to JSON document
+        String jsonDocument = rowToJson(row);
+
+        // Create index request
+        IndexRequest request = new IndexRequest(indexName);
+        request.source(jsonDocument, XContentType.JSON);
+
+        // Set document ID if specified
+        if (documentIdField != null && !documentIdField.isEmpty()) {
+            int idFieldIndex = schema.indexOf(documentIdField);
+            if (idFieldIndex >= 0) {
+                Object idValue = row.getField(idFieldIndex, 
schema.getType(idFieldIndex));
+                if (idValue != null) {
+                    request.id(idValue.toString());
+                }
+            }
+        }
+
+        // Add to bulk request
+        bulkRequest.add(request);
+        batchCounter++;
+
+        // Flush if batch size reached
+        if (batchCounter >= batchSize) {
+            flush();
+        }
+    }
+
+    @Override
+    public void finish() throws IOException {
+        flush();
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (Objects.nonNull(this.client)) {
+                client.close();
+            }
+        } catch (IOException e) {
+            throw new GeaFlowDSLException("Failed to close Elasticsearch 
client", e);
+        }
+    }
+
+    private void flush() throws IOException {
+        if (batchCounter > 0 && client != null) {
+            BulkResponse bulkResponse = client.bulk(bulkRequest, 
RequestOptions.DEFAULT);
+            if (bulkResponse.hasFailures()) {
+                LOGGER.error("Bulk request failed: {}", 
bulkResponse.buildFailureMessage());
+                throw new IOException("Bulk request failed: " + 
bulkResponse.buildFailureMessage());
+            }
+            bulkRequest = new BulkRequest();
+            batchCounter = 0;
+        }
+    }
+
+    private String rowToJson(Row row) {
+        // Convert Row to JSON string
+        Map<String, Object> map = new HashMap<>();
+        List<String> fieldNames = schema.getFieldNames();
+
+        for (int i = 0; i < fieldNames.size(); i++) {
+            String fieldName = fieldNames.get(i);
+            Object fieldValue = row.getField(i, schema.getType(i));
+            map.put(fieldName, fieldValue);
+        }
+
+        return GSON.toJson(map);
+    }
+
+    private RestHighLevelClient createElasticsearchClient() {
+        try {
+            String[] hostArray = hosts.split(",");
+            HttpHost[] httpHosts = new HttpHost[hostArray.length];
+
+            for (int i = 0; i < hostArray.length; i++) {
+                String host = hostArray[i].trim();
+                if (host.startsWith("http://";)) {
+                    host = host.substring(7);
+                } else if (host.startsWith("https://";)) {
+                    host = host.substring(8);
+                }
+
+                String[] parts = host.split(":");
+                String hostname = parts[0];
+                int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 
9200;
+                httpHosts[i] = new HttpHost(hostname, port, "http");
+            }
+
+            RestClientBuilder builder = RestClient.builder(httpHosts);
+
+            // Configure timeouts
+            builder.setRequestConfigCallback(requestConfigBuilder -> {
+                requestConfigBuilder.setConnectTimeout(connectionTimeout);
+                requestConfigBuilder.setSocketTimeout(socketTimeout);
+                return requestConfigBuilder;
+            });
+
+            // Configure authentication if provided
+            if (username != null && !username.isEmpty() && password != null) {
+                final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+                credentialsProvider.setCredentials(AuthScope.ANY,
+                        new UsernamePasswordCredentials(username, password));
+
+                builder.setHttpClientConfigCallback(httpClientBuilder -> {
+                    
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    return httpClientBuilder;
+                });
+            }
+
+            return new RestHighLevelClient(builder);
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to create Elasticsearch 
client", e);
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java
new file mode 100644
index 000000000..6fed8c37c
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import static 
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.DEFAULT_SEARCH_SIZE;
+import static 
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTPS_SCHEME;
+import static 
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTP_SCHEME;
+import static 
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SCHEMA_SUFFIX;
+import static 
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COLON;
+import static 
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COMMA;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.FetchWindow;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchTableSource implements TableSource {
+
+    private static final Gson GSON = new Gson();
+    private static final Type MAP_TYPE = new TypeToken<Map<String, 
Object>>(){}.getType();
+
+    private Logger logger = 
LoggerFactory.getLogger(ElasticsearchTableSource.class);
+
+    private StructType schema;
+    private String hosts;
+    private String indexName;
+    private String username;
+    private String password;
+    private String scrollTimeout;
+    private int connectionTimeout;
+    private int socketTimeout;
+
+    private RestHighLevelClient client;
+
+    @Override
+    public void init(Configuration tableConf, TableSchema tableSchema) {
+        this.schema = tableSchema;
+        this.hosts = 
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+        this.indexName = 
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+        this.username = 
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, 
"");
+        this.password = 
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, 
"");
+        this.scrollTimeout = 
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT,
+                ElasticsearchConstants.DEFAULT_SCROLL_TIMEOUT);
+        this.connectionTimeout = 
tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT,
+                ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT);
+        this.socketTimeout = 
tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT,
+                ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT);
+    }
+
+    @Override
+    public void open(RuntimeContext context) {
+        try {
+            this.client = createElasticsearchClient();
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to initialize Elasticsearch 
client", e);
+        }
+    }
+
+    @Override
+    public List<Partition> listPartitions() {
+        return Collections.singletonList(new 
ElasticsearchPartition(indexName));
+    }
+
+    @Override
+    public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
+        return new TableDeserializer<IN>() {
+            @Override
+            public void init(Configuration configuration, StructType 
structType) {
+                // Initialization if needed
+            }
+
+            @Override
+            public List<Row> deserialize(IN record) {
+                if (record instanceof SearchHit) {
+                    SearchHit hit = (SearchHit) record;
+                    Map<String, Object> source = hit.getSourceAsMap();
+                    if (source == null) {
+                        source = GSON.fromJson(hit.getSourceAsString(), 
MAP_TYPE);
+                    }
+
+                    // Convert map to Row based on schema
+                    Object[] values = new Object[schema.size()];
+                    for (int i = 0; i < schema.size(); i++) {
+                        String fieldName = schema.getFields().get(i).getName();
+                        values[i] = source.get(fieldName);
+                    }
+                    Row row = ObjectRow.create(values);
+                    return Collections.singletonList(row);
+                }
+                return Collections.emptyList();
+            }
+        };
+    }
+
+    @Override
+    public <T> FetchData<T> fetch(Partition partition, Optional<Offset> 
startOffset,
+                                  FetchWindow windowInfo) throws IOException {
+        try {
+            SearchRequest searchRequest = new SearchRequest(indexName);
+            SearchSourceBuilder searchSourceBuilder = new 
SearchSourceBuilder();
+            searchSourceBuilder.size(DEFAULT_SEARCH_SIZE); // Batch size
+
+            searchRequest.source(searchSourceBuilder);
+
+            // Use scroll for large dataset reading
+            Scroll scroll = new Scroll(TimeValue.parseTimeValue(scrollTimeout, 
"scroll_timeout"));
+            searchRequest.scroll(scroll);
+
+            SearchResponse searchResponse = client.search(searchRequest, 
RequestOptions.DEFAULT);
+            String scrollId = searchResponse.getScrollId();
+            SearchHit[] searchHits = searchResponse.getHits().getHits();
+
+            List<T> dataList = new ArrayList<>();
+            for (SearchHit hit : searchHits) {
+                dataList.add((T) hit);
+            }
+
+            // Clear scroll
+            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+            clearScrollRequest.addScrollId(scrollId);
+            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
+
+            ElasticsearchOffset nextOffset = new ElasticsearchOffset(scrollId);
+            return (FetchData<T>) FetchData.createStreamFetch(dataList, 
nextOffset, false);
+        } catch (Exception e) {
+            throw new IOException("Failed to fetch data from Elasticsearch", 
e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (client != null) {
+                client.close();
+            }
+        } catch (IOException e) {
+            // Log error but don't throw exception in close method
+            logger.warn("Failed to close Elasticsearch client", e);
+        }
+    }
+
+    private RestHighLevelClient createElasticsearchClient() {
+        try {
+            String[] hostArray = hosts.split(ES_SPLIT_COMMA);
+            HttpHost[] httpHosts = new HttpHost[hostArray.length];
+
+            for (int i = 0; i < hostArray.length; i++) {
+                String host = hostArray[i].trim();
+                if (host.startsWith(ES_HTTP_SCHEME + ES_SCHEMA_SUFFIX)) {
+                    host = host.substring(7);
+                } else if (host.startsWith(ES_HTTPS_SCHEME + 
ES_SCHEMA_SUFFIX)) {
+                    host = host.substring(8);
+                }
+
+                String[] parts = host.split(ES_SPLIT_COLON);
+                String hostname = parts[0];
+                int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 
9200;
+                httpHosts[i] = new HttpHost(hostname, port, ES_HTTP_SCHEME);
+            }
+
+            RestClientBuilder builder = RestClient.builder(httpHosts);
+
+            // Configure timeouts
+            builder.setRequestConfigCallback(requestConfigBuilder -> {
+                requestConfigBuilder.setConnectTimeout(connectionTimeout);
+                requestConfigBuilder.setSocketTimeout(socketTimeout);
+                return requestConfigBuilder;
+            });
+
+            return new RestHighLevelClient(builder);
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to create Elasticsearch 
client", e);
+        }
+    }
+
+    public static class ElasticsearchPartition implements Partition {
+        private final String indexName;
+
+        public ElasticsearchPartition(String indexName) {
+            this.indexName = indexName;
+        }
+
+        @Override
+        public String getName() {
+            return indexName;
+        }
+    }
+
+    public static class ElasticsearchOffset implements Offset {
+        private final String scrollId;
+        private final long timestamp;
+
+        public ElasticsearchOffset(String scrollId) {
+            this(scrollId, System.currentTimeMillis());
+        }
+
+        public ElasticsearchOffset(String scrollId, long timestamp) {
+            this.scrollId = scrollId;
+            this.timestamp = timestamp;
+        }
+
+        public String getScrollId() {
+            return scrollId;
+        }
+
+        @Override
+        public String humanReadable() {
+            return "ElasticsearchOffset{scrollId='" + scrollId + "', 
timestamp=" + timestamp + "}";
+        }
+
+        @Override
+        public long getOffset() {
+            return timestamp;
+        }
+
+        @Override
+        public boolean isTimestamp() {
+            return true;
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 000000000..93f5ad880
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchTableConnector
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeysTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeysTest.java
new file mode 100644
index 000000000..0eaf9cf91
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeysTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ElasticsearchConfigKeysTest {
+
+    @Test
+    public void testConfigKeys() {
+        Configuration config = new Configuration();
+
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, 
"test_index");
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, 
"elastic");
+
+        
Assert.assertEquals(config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS),
+                (Object) "localhost:9200");
+        
Assert.assertEquals(config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX),
+                (Object) "test_index");
+        
Assert.assertEquals(config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME),
+                (Object) "elastic");
+    }
+
+    @Test
+    public void testDefaultValues() {
+        Configuration config = new Configuration();
+
+        String batchSize = 
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE);
+        String scrollTimeout = 
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT);
+
+        Assert.assertEquals(batchSize, (Object) 
String.valueOf(ElasticsearchConstants.DEFAULT_BATCH_SIZE));
+        Assert.assertEquals(scrollTimeout, (Object) 
ElasticsearchConstants.DEFAULT_SCROLL_TIMEOUT);
+    }
+
+    @Test
+    public void testTimeoutValues() {
+        Configuration config = new Configuration();
+
+        String connectionTimeout = 
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT);
+        String socketTimeout = 
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT);
+
+        Assert.assertEquals(connectionTimeout,
+                (Object) 
String.valueOf(ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT));
+        Assert.assertEquals(socketTimeout,
+                (Object) 
String.valueOf(ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT));
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
new file mode 100644
index 000000000..a5f594b13
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ElasticsearchTableConnectorTest {
+
+    private ElasticsearchTableConnector connector;
+    private Configuration config;
+    private TableSchema schema;
+
+    @BeforeMethod
+    public void setUp() {
+        connector = new ElasticsearchTableConnector();
+        config = new Configuration();
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, 
"test_index");
+        
config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD, 
"id");
+
+        TableField idField = new TableField("id", Types.INTEGER, false);
+        TableField nameField = new TableField("name", Types.STRING, false);
+        schema = new TableSchema(new StructType(Arrays.asList(idField, 
nameField)));
+    }
+
+    @Test
+    public void testGetName() {
+        Assert.assertEquals(connector.getType(), "ELASTICSEARCH");
+    }
+
+    @Test
+    public void testGetSource() {
+        TableSource source = connector.createSource(config);
+        Assert.assertNotNull(source);
+        Assert.assertTrue(source instanceof ElasticsearchTableSource);
+    }
+
+    @Test
+    public void testGetSink() {
+        TableSink sink = connector.createSink(config);
+        Assert.assertNotNull(sink);
+        Assert.assertTrue(sink instanceof ElasticsearchTableSink);
+    }
+
+    @Test
+    public void testMultipleSourceInstances() {
+        TableSource source1 = connector.createSource(config);
+        TableSource source2 = connector.createSource(config);
+
+        Assert.assertNotNull(source1);
+        Assert.assertNotNull(source2);
+        Assert.assertNotSame(source1, source2);
+    }
+
+    @Test
+    public void testMultipleSinkInstances() {
+        TableSink sink1 = connector.createSink(config);
+        TableSink sink2 = connector.createSink(config);
+
+        Assert.assertNotNull(sink1);
+        Assert.assertNotNull(sink2);
+        Assert.assertNotSame(sink1, sink2);
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSinkTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSinkTest.java
new file mode 100644
index 000000000..942839522
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSinkTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ElasticsearchTableSinkTest {
+
+    private ElasticsearchTableSink sink;
+    private Configuration config;
+    private StructType schema;
+
+    @BeforeMethod
+    public void setUp() {
+        sink = new ElasticsearchTableSink();
+        config = new Configuration();
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, 
"test_index");
+        
config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD, 
"id");
+
+        TableField idField = new TableField("id", Types.INTEGER, false);
+        TableField nameField = new TableField("name", Types.STRING, false);
+        TableField ageField = new TableField("age", Types.INTEGER, false);
+        schema = new StructType(Arrays.asList(idField, nameField, ageField));
+    }
+
+    @Test
+    public void testInit() {
+        sink.init(config, schema);
+        Assert.assertNotNull(sink);
+    }
+
+    @Test(expectedExceptions = RuntimeException.class)
+    public void testInitWithoutIndex() {
+        Configuration invalidConfig = new Configuration();
+        
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        sink.init(invalidConfig, schema);
+    }
+
+    @Test
+    public void testInitWithoutIdField() {
+        Configuration invalidConfig = new Configuration();
+        
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, 
"test_index");
+        sink.init(invalidConfig, schema);
+        Assert.assertNotNull(sink);
+    }
+
+    @Test
+    public void testBatchSizeConfiguration() {
+        
config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE, "500");
+        sink.init(config, schema);
+        Assert.assertNotNull(sink);
+    }
+
+    @Test
+    public void testWriteRow() {
+        sink.init(config, schema);
+
+        Row row = ObjectRow.create(1, "Alice", 25);
+        Assert.assertNotNull(row);
+    }
+
+    @Test
+    public void testMultipleWrites() {
+        sink.init(config, schema);
+
+        for (int i = 0; i < 10; i++) {
+            Row row = ObjectRow.create(i, "User" + i, 20 + i);
+            Assert.assertNotNull(row);
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSourceTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSourceTest.java
new file mode 100644
index 000000000..1a0a00443
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSourceTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ElasticsearchTableSourceTest {
+
+    private ElasticsearchTableSource source;
+    private Configuration config;
+    private TableSchema schema;
+
+    @BeforeMethod
+    public void setUp() {
+        source = new ElasticsearchTableSource();
+        config = new Configuration();
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX, 
"test_index");
+
+        TableField idField = new TableField("id", Types.INTEGER, false);
+        TableField nameField = new TableField("name", Types.STRING, false);
+        schema = new TableSchema(new StructType(Arrays.asList(idField, 
nameField)));
+    }
+
+    @Test
+    public void testInit() {
+        source.init(config, schema);
+        Assert.assertNotNull(source);
+    }
+
+    @Test(expectedExceptions = RuntimeException.class)
+    public void testInitWithoutIndex() {
+        Configuration invalidConfig = new Configuration();
+        
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS, 
"localhost:9200");
+        source.init(invalidConfig, schema);
+    }
+
+    @Test
+    public void testListPartitions() {
+        source.init(config, schema);
+        List<Partition> partitions = source.listPartitions();
+
+        Assert.assertNotNull(partitions);
+        Assert.assertEquals(partitions.size(), 1);
+        Assert.assertEquals(partitions.get(0).getName(), "test_index");
+    }
+
+    @Test
+    public void testGetDeserializer() {
+        source.init(config, schema);
+        Assert.assertNotNull(source.getDeserializer(config));
+    }
+
+    @Test
+    public void testPartitionName() {
+        ElasticsearchTableSource.ElasticsearchPartition partition =
+                new 
ElasticsearchTableSource.ElasticsearchPartition("my_index");
+        Assert.assertEquals(partition.getName(), "my_index");
+    }
+
+    @Test
+    public void testOffsetHumanReadable() {
+        ElasticsearchTableSource.ElasticsearchOffset offset =
+                new ElasticsearchTableSource.ElasticsearchOffset("scroll_123");
+        Assert.assertTrue(offset.humanReadable().contains("scroll_123"));
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/pom.xml 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/pom.xml
new file mode 100644
index 000000000..4b1aa330f
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.geaflow</groupId>
+        <artifactId>geaflow-dsl-connector</artifactId>
+        <version>0.6.8-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>geaflow-dsl-connector-neo4j</artifactId>
+
+    <properties>
+        <neo4j-java-driver.version>4.4.18</neo4j-java-driver.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.geaflow</groupId>
+            <artifactId>geaflow-dsl-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geaflow</groupId>
+            <artifactId>geaflow-dsl-connector-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j.driver</groupId>
+            <artifactId>neo4j-java-driver</artifactId>
+            <version>${neo4j-java-driver.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>${testng.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeys.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeys.java
new file mode 100644
index 000000000..6c755547d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeys.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_BATCH_SIZE;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_DATABASE;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_MAX_CONNECTION_POOL_SIZE;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_NODE_LABEL;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_RELATIONSHIP_TYPE;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class Neo4jConfigKeys {
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_URI = ConfigKeys
+        .key("geaflow.dsl.neo4j.uri")
+        .noDefaultValue()
+        .description("Neo4j database URI (e.g., bolt://localhost:7687).");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_USERNAME = ConfigKeys
+        .key("geaflow.dsl.neo4j.username")
+        .noDefaultValue()
+        .description("Neo4j database username.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_PASSWORD = ConfigKeys
+        .key("geaflow.dsl.neo4j.password")
+        .noDefaultValue()
+        .description("Neo4j database password.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_DATABASE = ConfigKeys
+        .key("geaflow.dsl.neo4j.database")
+        .defaultValue(DEFAULT_DATABASE)
+        .description("Neo4j database name.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_BATCH_SIZE = ConfigKeys
+        .key("geaflow.dsl.neo4j.batch.size")
+        .defaultValue(DEFAULT_BATCH_SIZE)
+        .description("Batch size for writing to Neo4j.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME = 
ConfigKeys
+        .key("geaflow.dsl.neo4j.max.connection.lifetime.millis")
+        .defaultValue(DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS)
+        .description("Maximum lifetime of a connection in milliseconds.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE = 
ConfigKeys
+        .key("geaflow.dsl.neo4j.max.connection.pool.size")
+        .defaultValue(DEFAULT_MAX_CONNECTION_POOL_SIZE)
+        .description("Maximum size of the connection pool.");
+
+    public static final ConfigKey 
GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT = ConfigKeys
+        .key("geaflow.dsl.neo4j.connection.acquisition.timeout.millis")
+        .defaultValue(DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS)
+        .description("Timeout for acquiring a connection from the pool in 
milliseconds.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_QUERY = ConfigKeys
+        .key("geaflow.dsl.neo4j.query")
+        .noDefaultValue()
+        .description("Cypher query for reading data from Neo4j.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_NODE_LABEL = ConfigKeys
+        .key("geaflow.dsl.neo4j.node.label")
+        .defaultValue(DEFAULT_NODE_LABEL)
+        .description("Node label for writing nodes to Neo4j.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE = 
ConfigKeys
+        .key("geaflow.dsl.neo4j.relationship.type")
+        .defaultValue(DEFAULT_RELATIONSHIP_TYPE)
+        .description("Relationship type for writing relationships to Neo4j.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_WRITE_MODE = ConfigKeys
+        .key("geaflow.dsl.neo4j.write.mode")
+        .defaultValue("node")
+        .description("Write mode: 'node' for writing nodes, 'relationship' for 
writing relationships.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_NODE_ID_FIELD = ConfigKeys
+        .key("geaflow.dsl.neo4j.node.id.field")
+        .noDefaultValue()
+        .description("Field name to use as node ID.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD 
= ConfigKeys
+        .key("geaflow.dsl.neo4j.relationship.source.field")
+        .noDefaultValue()
+        .description("Field name for relationship source node ID.");
+
+    public static final ConfigKey GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD 
= ConfigKeys
+        .key("geaflow.dsl.neo4j.relationship.target.field")
+        .noDefaultValue()
+        .description("Field name for relationship target node ID.");
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConstants.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConstants.java
new file mode 100644
index 000000000..153b1f5c0
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+public class Neo4jConstants {
+
+    public static final String DEFAULT_DATABASE = "neo4j";
+
+    public static final int DEFAULT_BATCH_SIZE = 1000;
+
+    public static final long DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS = 
3600000L; // 1 hour
+
+    public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 100;
+
+    public static final long DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS = 
60000L; // 1 minute
+
+    public static final String DEFAULT_NODE_LABEL = "Node";
+
+    public static final String DEFAULT_RELATIONSHIP_LABEL = "relationship";
+
+    public static final String DEFAULT_RELATIONSHIP_TYPE = "RELATES_TO";
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnector.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnector.java
new file mode 100644
index 000000000..179298d23
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.TableWritableConnector;
+
+public class Neo4jTableConnector implements TableReadableConnector, 
TableWritableConnector {
+
+    public static final String TYPE = "Neo4j";
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    @Override
+    public TableSource createSource(Configuration conf) {
+        return new Neo4jTableSource();
+    }
+
+    @Override
+    public TableSink createSink(Configuration conf) {
+        return new Neo4jTableSink();
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java
new file mode 100644
index 000000000..239bc0015
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_NODE_LABEL;
+import static 
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_RELATIONSHIP_LABEL;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Neo4jTableSink implements TableSink {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jTableSink.class);
+
+    private StructType schema;
+    private String uri;
+    private String username;
+    private String password;
+    private String database;
+    private int batchSize;
+    private String writeMode;
+    private String nodeLabel;
+    private String relationshipType;
+    private String nodeIdField;
+    private String relationshipSourceField;
+    private String relationshipTargetField;
+    private long maxConnectionLifetime;
+    private int maxConnectionPoolSize;
+    private long connectionAcquisitionTimeout;
+
+    private Driver driver;
+    private Session session;
+    private Transaction transaction;
+    private List<Row> batch;
+
+    @Override
+    public void init(Configuration tableConf, StructType schema) {
+        LOGGER.info("Init Neo4j sink with config: {}, \n schema: {}", 
tableConf, schema);
+        this.schema = schema;
+
+        this.uri = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI);
+        this.username = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME);
+        this.password = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD);
+        this.database = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE);
+        this.batchSize = 
tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE);
+        this.writeMode = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE);
+        this.nodeLabel = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL);
+        this.relationshipType = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE);
+        this.nodeIdField = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_ID_FIELD);
+        this.relationshipSourceField = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD);
+        this.relationshipTargetField = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD);
+        this.maxConnectionLifetime = 
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME);
+        this.maxConnectionPoolSize = 
tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE);
+        this.connectionAcquisitionTimeout = 
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT);
+
+        validateConfig();
+        this.batch = new ArrayList<>(batchSize);
+    }
+
+    private void validateConfig() {
+        if (uri == null || uri.isEmpty()) {
+            throw new GeaFlowDSLException("Neo4j URI must be specified");
+        }
+        if (username == null || username.isEmpty()) {
+            throw new GeaFlowDSLException("Neo4j username must be specified");
+        }
+        if (password == null || password.isEmpty()) {
+            throw new GeaFlowDSLException("Neo4j password must be specified");
+        }
+        if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) {
+            if (nodeIdField == null || nodeIdField.isEmpty()) {
+                throw new GeaFlowDSLException("Node ID field must be specified 
for node write mode");
+            }
+        } else if (DEFAULT_RELATIONSHIP_LABEL.equals(writeMode)) {
+            if (relationshipSourceField == null || 
relationshipSourceField.isEmpty() 
+                || relationshipTargetField == null || 
relationshipTargetField.isEmpty()) {
+                throw new GeaFlowDSLException("Relationship source and target 
fields must be specified for relationship write mode");
+            }
+        } else {
+            throw new GeaFlowDSLException("Invalid write mode: " + writeMode + 
". Must be 'node' or 'relationship'");
+        }
+    }
+
+    @Override
+    public void open(RuntimeContext context) {
+        try {
+            Config config = Config.builder()
+                .withMaxConnectionLifetime(maxConnectionLifetime, 
TimeUnit.MILLISECONDS)
+                .withMaxConnectionPoolSize(maxConnectionPoolSize)
+                
.withConnectionAcquisitionTimeout(connectionAcquisitionTimeout, 
TimeUnit.MILLISECONDS)
+                .build();
+
+            this.driver = GraphDatabase.driver(uri, AuthTokens.basic(username, 
password), config);
+            
+            SessionConfig sessionConfig = SessionConfig.builder()
+                .withDatabase(database)
+                .build();
+            
+            this.session = driver.session(sessionConfig);
+            this.transaction = session.beginTransaction();
+            
+            LOGGER.info("Neo4j connection established successfully");
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to connect to Neo4j: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void write(Row row) throws IOException {
+        batch.add(row);
+        if (batch.size() >= batchSize) {
+            flush();
+        }
+    }
+
+    @Override
+    public void finish() throws IOException {
+        if (!batch.isEmpty()) {
+            flush();
+        }
+        try {
+            if (transaction != null) {
+                transaction.commit();
+                transaction.close();
+                transaction = null;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to commit transaction", e);
+            try {
+                if (transaction != null) {
+                    transaction.rollback();
+                }
+            } catch (Exception ex) {
+                throw new GeaFlowDSLException("Failed to rollback 
transaction", ex);
+            }
+            throw new GeaFlowDSLException("Failed to finish writing to Neo4j", 
e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (transaction != null) {
+                transaction.close();
+                transaction = null;
+            }
+            if (session != null) {
+                session.close();
+                session = null;
+            }
+            if (driver != null) {
+                driver.close();
+                driver = null;
+            }
+            LOGGER.info("Neo4j connection closed successfully");
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to close Neo4j connection", 
e);
+        }
+    }
+
+    private void flush() {
+        if (batch.isEmpty()) {
+            return;
+        }
+
+        try {
+            if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) {
+                writeNodes();
+            } else {
+                writeRelationships();
+            }
+            batch.clear();
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to flush batch to Neo4j", e);
+        }
+    }
+
+    private void writeNodes() {
+        List<String> fieldNames = schema.getFieldNames();
+        IType<?>[] types = schema.getTypes();
+        
+        int nodeIdIndex = fieldNames.indexOf(nodeIdField);
+        if (nodeIdIndex == -1) {
+            throw new GeaFlowDSLException("Node ID field not found in schema: 
" + nodeIdField);
+        }
+
+        for (Row row : batch) {
+            Map<String, Object> properties = new HashMap<>();
+            for (int i = 0; i < fieldNames.size(); i++) {
+                if (i == nodeIdIndex) {
+                    continue; // Skip ID field, it will be used as node ID
+                }
+                Object value = row.getField(i, types[i]);
+                if (value != null) {
+                    properties.put(fieldNames.get(i), value);
+                }
+            }
+
+            Object nodeId = row.getField(nodeIdIndex, types[nodeIdIndex]);
+            if (nodeId == null) {
+                throw new GeaFlowDSLException("Node ID cannot be null");
+            }
+
+            String cypher = String.format(
+                "MERGE (n:%s {id: $id}) SET n += $properties",
+                nodeLabel
+            );
+            
+            Map<String, Object> parameters = new HashMap<>();
+            parameters.put("id", nodeId);
+            parameters.put("properties", properties);
+            
+            transaction.run(cypher, parameters);
+        }
+    }
+
+    private void writeRelationships() {
+        List<String> fieldNames = schema.getFieldNames();
+        IType<?>[] types = schema.getTypes();
+        
+        int sourceIndex = fieldNames.indexOf(relationshipSourceField);
+        int targetIndex = fieldNames.indexOf(relationshipTargetField);
+        
+        if (sourceIndex == -1) {
+            throw new GeaFlowDSLException("Relationship source field not found 
in schema: " + relationshipSourceField);
+        }
+        if (targetIndex == -1) {
+            throw new GeaFlowDSLException("Relationship target field not found 
in schema: " + relationshipTargetField);
+        }
+
+        for (Row row : batch) {
+            Object sourceId = row.getField(sourceIndex, types[sourceIndex]);
+            Object targetId = row.getField(targetIndex, types[targetIndex]);
+            
+            if (sourceId == null || targetId == null) {
+                throw new GeaFlowDSLException("Relationship source and target 
IDs cannot be null");
+            }
+
+            Map<String, Object> properties = new HashMap<>();
+            for (int i = 0; i < fieldNames.size(); i++) {
+                if (i == sourceIndex || i == targetIndex) {
+                    continue; // Skip source and target fields
+                }
+                Object value = row.getField(i, types[i]);
+                if (value != null) {
+                    properties.put(fieldNames.get(i), value);
+                }
+            }
+
+            final String cypher = String.format(
+                "MATCH (a {id: $sourceId}), (b {id: $targetId}) "
+                + "MERGE (a)-[r:%s]->(b) SET r += $properties",
+                relationshipType
+            );
+            
+            Map<String, Object> parameters = new HashMap<>();
+            parameters.put("sourceId", sourceId);
+            parameters.put("targetId", targetId);
+            parameters.put("properties", properties);
+            
+            transaction.run(cypher, parameters);
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSource.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSource.java
new file mode 100644
index 000000000..019e3667d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSource.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.api.window.WindowType;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.DeserializerFactory;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.FetchWindow;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Neo4jTableSource implements TableSource {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jTableSource.class);
+
+    private Configuration tableConf;
+    private StructType schema;
+    private String uri;
+    private String username;
+    private String password;
+    private String database;
+    private String cypherQuery;
+    private long maxConnectionLifetime;
+    private int maxConnectionPoolSize;
+    private long connectionAcquisitionTimeout;
+
+    private Driver driver;
+    private Map<Partition, Session> partitionSessionMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void init(Configuration tableConf, TableSchema tableSchema) {
+        LOGGER.info("Init Neo4j source with config: {}, \n schema: {}", 
tableConf, tableSchema);
+        this.tableConf = tableConf;
+        this.schema = tableSchema;
+
+        this.uri = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI);
+        this.username = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME);
+        this.password = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD);
+        this.database = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE);
+        this.cypherQuery = 
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_QUERY);
+        this.maxConnectionLifetime = 
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME);
+        this.maxConnectionPoolSize = 
tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE);
+        this.connectionAcquisitionTimeout = 
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT);
+
+        if (cypherQuery == null || cypherQuery.isEmpty()) {
+            throw new GeaFlowDSLException("Neo4j query must be specified");
+        }
+    }
+
+    @Override
+    public void open(RuntimeContext context) {
+        try {
+            Config config = Config.builder()
+                .withMaxConnectionLifetime(maxConnectionLifetime, 
TimeUnit.MILLISECONDS)
+                .withMaxConnectionPoolSize(maxConnectionPoolSize)
+                
.withConnectionAcquisitionTimeout(connectionAcquisitionTimeout, 
TimeUnit.MILLISECONDS)
+                .build();
+
+            this.driver = GraphDatabase.driver(uri, AuthTokens.basic(username, 
password), config);
+            LOGGER.info("Neo4j driver created successfully");
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to create Neo4j driver: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public List<Partition> listPartitions() {
+        // Neo4j doesn't have native partitioning like JDBC
+        // For simplicity, we return a single partition
+        return Collections.singletonList(new Neo4jPartition(cypherQuery));
+    }
+
+    @Override
+    public List<Partition> listPartitions(int parallelism) {
+        return listPartitions();
+    }
+
+    @Override
+    public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
+        return DeserializerFactory.loadRowTableDeserializer();
+    }
+
+    @Override
+    public <T> FetchData<T> fetch(Partition partition, Optional<Offset> 
startOffset,
+                                  FetchWindow windowInfo) throws IOException {
+        if (!(windowInfo.getType() == WindowType.SIZE_TUMBLING_WINDOW
+            || windowInfo.getType() == WindowType.ALL_WINDOW)) {
+            throw new GeaFlowDSLException("Not support window type: {}", 
windowInfo.getType());
+        }
+
+        Neo4jPartition neo4jPartition = (Neo4jPartition) partition;
+        Session session = partitionSessionMap.get(partition);
+        
+        if (session == null) {
+            SessionConfig sessionConfig = SessionConfig.builder()
+                .withDatabase(database)
+                .build();
+            session = driver.session(sessionConfig);
+            partitionSessionMap.put(partition, session);
+        }
+
+        long offset = startOffset.isPresent() ? startOffset.get().getOffset() 
: 0;
+        
+        List<Row> dataList = new ArrayList<>();
+        try {
+            String query = neo4jPartition.getQuery();
+            // Add SKIP and LIMIT to the query for pagination
+            String paginatedQuery = query + " SKIP $skip LIMIT $limit";
+            
+            Map<String, Object> parameters = new HashMap<>();
+            parameters.put("skip", offset);
+            parameters.put("limit", windowInfo.windowSize());
+            
+            Result result = session.run(paginatedQuery, parameters);
+            
+            List<String> fieldNames = schema.getFieldNames();
+            
+            while (result.hasNext()) {
+                Record record = result.next();
+                Object[] values = new Object[fieldNames.size()];
+                
+                for (int i = 0; i < fieldNames.size(); i++) {
+                    String fieldName = fieldNames.get(i);
+                    if (record.containsKey(fieldName)) {
+                        Value value = record.get(fieldName);
+                        values[i] = convertNeo4jValue(value);
+                    } else {
+                        values[i] = null;
+                    }
+                }
+                
+                dataList.add(ObjectRow.create(values));
+            }
+            
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to fetch data from Neo4j", 
e);
+        }
+
+        Neo4jOffset nextOffset = new Neo4jOffset(offset + dataList.size());
+        boolean isFinish = windowInfo.getType() == WindowType.ALL_WINDOW 
+            || dataList.size() < windowInfo.windowSize();
+        
+        return (FetchData<T>) FetchData.createStreamFetch(dataList, 
nextOffset, isFinish);
+    }
+
+    @Override
+    public void close() {
+        try {
+            for (Session session : partitionSessionMap.values()) {
+                if (session != null) {
+                    session.close();
+                }
+            }
+            partitionSessionMap.clear();
+            
+            if (driver != null) {
+                driver.close();
+                driver = null;
+            }
+            LOGGER.info("Neo4j connections closed successfully");
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Failed to close Neo4j connections", 
e);
+        }
+    }
+
+    private Object convertNeo4jValue(Value value) {
+        if (value.isNull()) {
+            return null;
+        }
+        
+        switch (value.type().name()) {
+            case "INTEGER":
+                return value.asLong();
+            case "FLOAT":
+                return value.asDouble();
+            case "STRING":
+                return value.asString();
+            case "BOOLEAN":
+                return value.asBoolean();
+            case "LIST":
+                return value.asList();
+            case "MAP":
+                return value.asMap();
+            case "NODE":
+                return value.asNode().asMap();
+            case "RELATIONSHIP":
+                return value.asRelationship().asMap();
+            case "PATH":
+                return value.asPath().toString();
+            default:
+                return value.asObject();
+        }
+    }
+
+    public static class Neo4jPartition implements Partition {
+
+        private final String query;
+
+        public Neo4jPartition(String query) {
+            this.query = query;
+        }
+
+        public String getQuery() {
+            return query;
+        }
+
+        @Override
+        public String getName() {
+            return "neo4j-partition-" + query.hashCode();
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(query);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof Neo4jPartition)) {
+                return false;
+            }
+            Neo4jPartition that = (Neo4jPartition) o;
+            return Objects.equals(query, that.query);
+        }
+    }
+
+    public static class Neo4jOffset implements Offset {
+
+        private final long offset;
+
+        public Neo4jOffset(long offset) {
+            this.offset = offset;
+        }
+
+        @Override
+        public String humanReadable() {
+            return String.valueOf(offset);
+        }
+
+        @Override
+        public long getOffset() {
+            return offset;
+        }
+
+        @Override
+        public boolean isTimestamp() {
+            return false;
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 000000000..42a584079
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.geaflow.dsl.connector.neo4j.Neo4jTableConnector
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeysTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeysTest.java
new file mode 100644
index 000000000..c93dd546d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeysTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class Neo4jConfigKeysTest {
+
+    @Test
+    public void testDefaultValues() {
+        Assert.assertEquals(Neo4jConstants.DEFAULT_DATABASE, "neo4j");
+        Assert.assertEquals(Neo4jConstants.DEFAULT_BATCH_SIZE, 1000);
+        
Assert.assertEquals(Neo4jConstants.DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS, 
3600000L);
+        Assert.assertEquals(Neo4jConstants.DEFAULT_MAX_CONNECTION_POOL_SIZE, 
100);
+        
Assert.assertEquals(Neo4jConstants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS,
 60000L);
+        Assert.assertEquals(Neo4jConstants.DEFAULT_NODE_LABEL, "Node");
+        Assert.assertEquals(Neo4jConstants.DEFAULT_RELATIONSHIP_TYPE, 
"RELATES_TO");
+    }
+
+    @Test
+    public void testConfigKeyNames() {
+        Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI.getKey(), 
+            "geaflow.dsl.neo4j.uri");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME.getKey(), 
+            "geaflow.dsl.neo4j.username");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD.getKey(), 
+            "geaflow.dsl.neo4j.password");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE.getKey(), 
+            "geaflow.dsl.neo4j.database");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE.getKey(), 
+            "geaflow.dsl.neo4j.batch.size");
+        Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_QUERY.getKey(), 
+            "geaflow.dsl.neo4j.query");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL.getKey(), 
+            "geaflow.dsl.neo4j.node.label");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE.getKey(),
 
+            "geaflow.dsl.neo4j.relationship.type");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE.getKey(), 
+            "geaflow.dsl.neo4j.write.mode");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_ID_FIELD.getKey(), 
+            "geaflow.dsl.neo4j.node.id.field");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD.getKey(),
 
+            "geaflow.dsl.neo4j.relationship.source.field");
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD.getKey(),
 
+            "geaflow.dsl.neo4j.relationship.target.field");
+    }
+
+    @Test
+    public void testConfigKeyDefaults() {
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE.getDefaultValue(),
 
+            Neo4jConstants.DEFAULT_DATABASE);
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE.getDefaultValue(),
 
+            Neo4jConstants.DEFAULT_BATCH_SIZE);
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL.getDefaultValue(),
 
+            Neo4jConstants.DEFAULT_NODE_LABEL);
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE.getDefaultValue(),
 
+            Neo4jConstants.DEFAULT_RELATIONSHIP_TYPE);
+        
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE.getDefaultValue(),
 
+            "node");
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnectorTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnectorTest.java
new file mode 100644
index 000000000..ed2f3dae3
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnectorTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class Neo4jTableConnectorTest {
+
+    private Neo4jTableConnector connector;
+    private Configuration config;
+
+    @BeforeMethod
+    public void setUp() {
+        connector = new Neo4jTableConnector();
+        config = new Configuration();
+        config.put(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI, 
"bolt://localhost:7687");
+        config.put(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME, "neo4j");
+        config.put(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD, "password");
+    }
+
+    @Test
+    public void testGetType() {
+        Assert.assertEquals(connector.getType(), "Neo4j");
+    }
+
+    @Test
+    public void testCreateSource() {
+        TableSource source = connector.createSource(config);
+        Assert.assertNotNull(source);
+        Assert.assertTrue(source instanceof Neo4jTableSource);
+    }
+
+    @Test
+    public void testCreateSink() {
+        TableSink sink = connector.createSink(config);
+        Assert.assertNotNull(sink);
+        Assert.assertTrue(sink instanceof Neo4jTableSink);
+    }
+
+    @Test
+    public void testMultipleSourceInstances() {
+        TableSource source1 = connector.createSource(config);
+        TableSource source2 = connector.createSource(config);
+
+        Assert.assertNotNull(source1);
+        Assert.assertNotNull(source2);
+        Assert.assertNotSame(source1, source2);
+    }
+
+    @Test
+    public void testMultipleSinkInstances() {
+        TableSink sink1 = connector.createSink(config);
+        TableSink sink2 = connector.createSink(config);
+
+        Assert.assertNotNull(sink1);
+        Assert.assertNotNull(sink2);
+        Assert.assertNotSame(sink1, sink2);
+    }
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
index b7c9821a6..d45d4800d 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
@@ -47,6 +47,8 @@
         <module>geaflow-dsl-connector-pulsar</module>
         <module>geaflow-dsl-connector-random</module>
         <module>geaflow-dsl-connector-paimon</module>
+        <module>geaflow-dsl-connector-neo4j</module>
+        <module>geaflow-dsl-connector-elasticsearch</module>
     </modules>
 
     <dependencyManagement>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to