This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 13618a5c3 feat: add neo4j and es connector (#653)
13618a5c3 is described below
commit 13618a5c30ad6be1ca0cea003203d46d3791aa20
Author: shown <[email protected]>
AuthorDate: Tue Dec 2 19:51:52 2025 +0800
feat: add neo4j and es connector (#653)
* t# This is a combination of 2 commits.
feat: add neo4j connector
Signed-off-by: jishiwen.jsw <[email protected]>
* fix: adapter jdk 8
Signed-off-by: jishiwen.jsw <[email protected]>
* feat: update by comment
Signed-off-by: jishiwen.jsw <[email protected]>
* fix: fix checkstyle errror
Signed-off-by: jishiwen.jsw <[email protected]>
* fix
Signed-off-by: jishiwen.jsw <[email protected]>
---------
Signed-off-by: jishiwen.jsw <[email protected]>
---
.../geaflow-dsl-connector-elasticsearch/pom.xml | 61 +++++
.../elasticsearch/ElasticsearchConfigKeys.java | 71 +++++
.../elasticsearch/ElasticsearchConstants.java | 44 +++
.../elasticsearch/ElasticsearchTableConnector.java | 46 ++++
.../elasticsearch/ElasticsearchTableSink.java | 214 +++++++++++++++
.../elasticsearch/ElasticsearchTableSource.java | 270 ++++++++++++++++++
...apache.geaflow.dsl.connector.api.TableConnector | 19 ++
.../elasticsearch/ElasticsearchConfigKeysTest.java | 67 +++++
.../ElasticsearchTableConnectorTest.java | 92 +++++++
.../elasticsearch/ElasticsearchTableSinkTest.java | 100 +++++++
.../ElasticsearchTableSourceTest.java | 98 +++++++
.../geaflow-dsl-connector-neo4j/pom.xml | 62 +++++
.../dsl/connector/neo4j/Neo4jConfigKeys.java | 109 ++++++++
.../dsl/connector/neo4j/Neo4jConstants.java | 39 +++
.../dsl/connector/neo4j/Neo4jTableConnector.java | 46 ++++
.../dsl/connector/neo4j/Neo4jTableSink.java | 301 +++++++++++++++++++++
.../dsl/connector/neo4j/Neo4jTableSource.java | 300 ++++++++++++++++++++
...apache.geaflow.dsl.connector.api.TableConnector | 20 ++
.../dsl/connector/neo4j/Neo4jConfigKeysTest.java | 79 ++++++
.../connector/neo4j/Neo4jTableConnectorTest.java | 81 ++++++
geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml | 2 +
21 files changed, 2121 insertions(+)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
new file mode 100644
index 000000000..8cf12e3bd
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector</artifactId>
+ <version>0.6.8-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>geaflow-dsl-connector-elasticsearch</artifactId>
+ <name>geaflow-dsl-connector-elasticsearch</name>
+
+ <properties>
+ <elasticsearch.version>7.17.10</elasticsearch.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>${testng.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeys.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeys.java
new file mode 100644
index 000000000..65df4b4cd
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeys.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class ElasticsearchConfigKeys {
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_HOSTS = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.hosts")
+ .noDefaultValue()
+ .description("Elasticsearch cluster hosts list.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_INDEX = ConfigKeys
+ .key("geaflow.dsl.elasticsearch.index")
+ .noDefaultValue()
+ .description("Elasticsearch index name.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD
= ConfigKeys
+ .key("geaflow.dsl.elasticsearch.document.id.field")
+ .noDefaultValue()
+ .description("Elasticsearch document id field.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_USERNAME =
ConfigKeys
+ .key("geaflow.dsl.elasticsearch.username")
+ .noDefaultValue()
+ .description("Elasticsearch username for authentication.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_PASSWORD =
ConfigKeys
+ .key("geaflow.dsl.elasticsearch.password")
+ .noDefaultValue()
+ .description("Elasticsearch password for authentication.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE =
ConfigKeys
+ .key("geaflow.dsl.elasticsearch.batch.size")
+ .defaultValue("1000")
+ .description("Elasticsearch batch write size.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT =
ConfigKeys
+ .key("geaflow.dsl.elasticsearch.scroll.timeout")
+ .defaultValue("60s")
+ .description("Elasticsearch scroll query timeout.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT
= ConfigKeys
+ .key("geaflow.dsl.elasticsearch.connection.timeout")
+ .defaultValue("1000")
+ .description("Elasticsearch connection timeout in milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT =
ConfigKeys
+ .key("geaflow.dsl.elasticsearch.socket.timeout")
+ .defaultValue("30000")
+ .description("Elasticsearch socket timeout in milliseconds.");
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConstants.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConstants.java
new file mode 100644
index 000000000..ed8c7adac
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+public class ElasticsearchConstants {
+
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+
+ public static final String DEFAULT_SCROLL_TIMEOUT = "60s";
+
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
+
+ public static final int DEFAULT_SOCKET_TIMEOUT = 30000;
+
+ public static final int DEFAULT_SEARCH_SIZE = 1000;
+
+ public static final String ES_SCHEMA_SUFFIX = "://";
+
+ public static final String ES_HTTP_SCHEME = "http";
+
+ public static final String ES_HTTPS_SCHEME = "https";
+
+ public static final String ES_SPLIT_COMMA = ",";
+
+ public static final String ES_SPLIT_COLON = ";";
+
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnector.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnector.java
new file mode 100644
index 000000000..f8950a8d0
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.TableWritableConnector;
+
+public class ElasticsearchTableConnector implements TableReadableConnector,
TableWritableConnector {
+
+ public static final String TYPE = "ELASTICSEARCH";
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public TableSource createSource(Configuration conf) {
+ return new ElasticsearchTableSource();
+ }
+
+ @Override
+ public TableSink createSink(Configuration conf) {
+ return new ElasticsearchTableSink();
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSink.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSink.java
new file mode 100644
index 000000000..4831a6c21
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSink.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.xcontent.XContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchTableSink implements TableSink {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchTableSink.class);
+ private static final Gson GSON = new Gson();
+
+ private StructType schema;
+ private String hosts;
+ private String indexName;
+ private String documentIdField;
+ private String username;
+ private String password;
+ private int batchSize;
+ private int connectionTimeout;
+ private int socketTimeout;
+
+ private RestHighLevelClient client;
+ private BulkRequest bulkRequest;
+ private int batchCounter = 0;
+
+ @Override
+ public void init(Configuration conf, StructType schema) {
+ LOGGER.info("Prepare with config: {}, \n schema: {}", conf, schema);
+ this.schema = schema;
+
+ this.hosts =
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+ this.indexName =
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+ this.documentIdField =
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD,
"");
+ this.username =
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, "");
+ this.password =
conf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, "");
+ this.batchSize =
conf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE,
+ ElasticsearchConstants.DEFAULT_BATCH_SIZE);
+ this.connectionTimeout =
conf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT,
+ ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT);
+ this.socketTimeout =
conf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT,
+ ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT);
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ try {
+ this.client = createElasticsearchClient();
+ this.bulkRequest = new BulkRequest();
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to create Elasticsearch
client", e);
+ }
+ }
+
+ @Override
+ public void write(Row row) throws IOException {
+ // Convert row to JSON document
+ String jsonDocument = rowToJson(row);
+
+ // Create index request
+ IndexRequest request = new IndexRequest(indexName);
+ request.source(jsonDocument, XContentType.JSON);
+
+ // Set document ID if specified
+ if (documentIdField != null && !documentIdField.isEmpty()) {
+ int idFieldIndex = schema.indexOf(documentIdField);
+ if (idFieldIndex >= 0) {
+ Object idValue = row.getField(idFieldIndex,
schema.getType(idFieldIndex));
+ if (idValue != null) {
+ request.id(idValue.toString());
+ }
+ }
+ }
+
+ // Add to bulk request
+ bulkRequest.add(request);
+ batchCounter++;
+
+ // Flush if batch size reached
+ if (batchCounter >= batchSize) {
+ flush();
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ flush();
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (Objects.nonNull(this.client)) {
+ client.close();
+ }
+ } catch (IOException e) {
+ throw new GeaFlowDSLException("Failed to close Elasticsearch
client", e);
+ }
+ }
+
+ private void flush() throws IOException {
+ if (batchCounter > 0 && client != null) {
+ BulkResponse bulkResponse = client.bulk(bulkRequest,
RequestOptions.DEFAULT);
+ if (bulkResponse.hasFailures()) {
+ LOGGER.error("Bulk request failed: {}",
bulkResponse.buildFailureMessage());
+ throw new IOException("Bulk request failed: " +
bulkResponse.buildFailureMessage());
+ }
+ bulkRequest = new BulkRequest();
+ batchCounter = 0;
+ }
+ }
+
+ private String rowToJson(Row row) {
+ // Convert Row to JSON string
+ Map<String, Object> map = new HashMap<>();
+ List<String> fieldNames = schema.getFieldNames();
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ Object fieldValue = row.getField(i, schema.getType(i));
+ map.put(fieldName, fieldValue);
+ }
+
+ return GSON.toJson(map);
+ }
+
+ private RestHighLevelClient createElasticsearchClient() {
+ try {
+ String[] hostArray = hosts.split(",");
+ HttpHost[] httpHosts = new HttpHost[hostArray.length];
+
+ for (int i = 0; i < hostArray.length; i++) {
+ String host = hostArray[i].trim();
+ if (host.startsWith("http://")) {
+ host = host.substring(7);
+ } else if (host.startsWith("https://")) {
+ host = host.substring(8);
+ }
+
+ String[] parts = host.split(":");
+ String hostname = parts[0];
+ int port = parts.length > 1 ? Integer.parseInt(parts[1]) :
9200;
+ httpHosts[i] = new HttpHost(hostname, port, "http");
+ }
+
+ RestClientBuilder builder = RestClient.builder(httpHosts);
+
+ // Configure timeouts
+ builder.setRequestConfigCallback(requestConfigBuilder -> {
+ requestConfigBuilder.setConnectTimeout(connectionTimeout);
+ requestConfigBuilder.setSocketTimeout(socketTimeout);
+ return requestConfigBuilder;
+ });
+
+ // Configure authentication if provided
+ if (username != null && !username.isEmpty() && password != null) {
+ final CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(username, password));
+
+ builder.setHttpClientConfigCallback(httpClientBuilder -> {
+
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ return httpClientBuilder;
+ });
+ }
+
+ return new RestHighLevelClient(builder);
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to create Elasticsearch
client", e);
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java
new file mode 100644
index 000000000..6fed8c37c
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import static
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.DEFAULT_SEARCH_SIZE;
+import static
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTPS_SCHEME;
+import static
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTP_SCHEME;
+import static
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SCHEMA_SUFFIX;
+import static
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COLON;
+import static
org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COMMA;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.FetchWindow;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchTableSource implements TableSource {
+
+ private static final Gson GSON = new Gson();
+ private static final Type MAP_TYPE = new TypeToken<Map<String,
Object>>(){}.getType();
+
+ private Logger logger =
LoggerFactory.getLogger(ElasticsearchTableSource.class);
+
+ private StructType schema;
+ private String hosts;
+ private String indexName;
+ private String username;
+ private String password;
+ private String scrollTimeout;
+ private int connectionTimeout;
+ private int socketTimeout;
+
+ private RestHighLevelClient client;
+
+ @Override
+ public void init(Configuration tableConf, TableSchema tableSchema) {
+ this.schema = tableSchema;
+ this.hosts =
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS);
+ this.indexName =
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX);
+ this.username =
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME,
"");
+ this.password =
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD,
"");
+ this.scrollTimeout =
tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT,
+ ElasticsearchConstants.DEFAULT_SCROLL_TIMEOUT);
+ this.connectionTimeout =
tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT,
+ ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT);
+ this.socketTimeout =
tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT,
+ ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT);
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ try {
+ this.client = createElasticsearchClient();
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to initialize Elasticsearch
client", e);
+ }
+ }
+
+ @Override
+ public List<Partition> listPartitions() {
+ return Collections.singletonList(new
ElasticsearchPartition(indexName));
+ }
+
+ @Override
+ public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
+ return new TableDeserializer<IN>() {
+ @Override
+ public void init(Configuration configuration, StructType
structType) {
+ // Initialization if needed
+ }
+
+ @Override
+ public List<Row> deserialize(IN record) {
+ if (record instanceof SearchHit) {
+ SearchHit hit = (SearchHit) record;
+ Map<String, Object> source = hit.getSourceAsMap();
+ if (source == null) {
+ source = GSON.fromJson(hit.getSourceAsString(),
MAP_TYPE);
+ }
+
+ // Convert map to Row based on schema
+ Object[] values = new Object[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ String fieldName = schema.getFields().get(i).getName();
+ values[i] = source.get(fieldName);
+ }
+ Row row = ObjectRow.create(values);
+ return Collections.singletonList(row);
+ }
+ return Collections.emptyList();
+ }
+ };
+ }
+
+ @Override
+ public <T> FetchData<T> fetch(Partition partition, Optional<Offset>
startOffset,
+ FetchWindow windowInfo) throws IOException {
+ try {
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder();
+ searchSourceBuilder.size(DEFAULT_SEARCH_SIZE); // Batch size
+
+ searchRequest.source(searchSourceBuilder);
+
+ // Use scroll for large dataset reading
+ Scroll scroll = new Scroll(TimeValue.parseTimeValue(scrollTimeout,
"scroll_timeout"));
+ searchRequest.scroll(scroll);
+
+ SearchResponse searchResponse = client.search(searchRequest,
RequestOptions.DEFAULT);
+ String scrollId = searchResponse.getScrollId();
+ SearchHit[] searchHits = searchResponse.getHits().getHits();
+
+ List<T> dataList = new ArrayList<>();
+ for (SearchHit hit : searchHits) {
+ dataList.add((T) hit);
+ }
+
+ // Clear scroll
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(scrollId);
+ client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
+
+ ElasticsearchOffset nextOffset = new ElasticsearchOffset(scrollId);
+ return (FetchData<T>) FetchData.createStreamFetch(dataList,
nextOffset, false);
+ } catch (Exception e) {
+ throw new IOException("Failed to fetch data from Elasticsearch",
e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (IOException e) {
+ // Log error but don't throw exception in close method
+ logger.warn("Failed to close Elasticsearch client", e);
+ }
+ }
+
+ private RestHighLevelClient createElasticsearchClient() {
+ try {
+ String[] hostArray = hosts.split(ES_SPLIT_COMMA);
+ HttpHost[] httpHosts = new HttpHost[hostArray.length];
+
+ for (int i = 0; i < hostArray.length; i++) {
+ String host = hostArray[i].trim();
+ if (host.startsWith(ES_HTTP_SCHEME + ES_SCHEMA_SUFFIX)) {
+ host = host.substring(7);
+ } else if (host.startsWith(ES_HTTPS_SCHEME +
ES_SCHEMA_SUFFIX)) {
+ host = host.substring(8);
+ }
+
+ String[] parts = host.split(ES_SPLIT_COLON);
+ String hostname = parts[0];
+ int port = parts.length > 1 ? Integer.parseInt(parts[1]) :
9200;
+ httpHosts[i] = new HttpHost(hostname, port, ES_HTTP_SCHEME);
+ }
+
+ RestClientBuilder builder = RestClient.builder(httpHosts);
+
+ // Configure timeouts
+ builder.setRequestConfigCallback(requestConfigBuilder -> {
+ requestConfigBuilder.setConnectTimeout(connectionTimeout);
+ requestConfigBuilder.setSocketTimeout(socketTimeout);
+ return requestConfigBuilder;
+ });
+
+ return new RestHighLevelClient(builder);
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to create Elasticsearch
client", e);
+ }
+ }
+
+ public static class ElasticsearchPartition implements Partition {
+ private final String indexName;
+
+ public ElasticsearchPartition(String indexName) {
+ this.indexName = indexName;
+ }
+
+ @Override
+ public String getName() {
+ return indexName;
+ }
+ }
+
+ public static class ElasticsearchOffset implements Offset {
+ private final String scrollId;
+ private final long timestamp;
+
+ public ElasticsearchOffset(String scrollId) {
+ this(scrollId, System.currentTimeMillis());
+ }
+
+ public ElasticsearchOffset(String scrollId, long timestamp) {
+ this.scrollId = scrollId;
+ this.timestamp = timestamp;
+ }
+
+ public String getScrollId() {
+ return scrollId;
+ }
+
+ @Override
+ public String humanReadable() {
+ return "ElasticsearchOffset{scrollId='" + scrollId + "',
timestamp=" + timestamp + "}";
+ }
+
+ @Override
+ public long getOffset() {
+ return timestamp;
+ }
+
+ @Override
+ public boolean isTimestamp() {
+ return true;
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 000000000..93f5ad880
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchTableConnector
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeysTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeysTest.java
new file mode 100644
index 000000000..0eaf9cf91
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchConfigKeysTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ElasticsearchConfigKeysTest {
+
+ @Test
+ public void testConfigKeys() {
+ Configuration config = new Configuration();
+
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX,
"test_index");
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME,
"elastic");
+
+
Assert.assertEquals(config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS),
+ (Object) "localhost:9200");
+
Assert.assertEquals(config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX),
+ (Object) "test_index");
+
Assert.assertEquals(config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME),
+ (Object) "elastic");
+ }
+
+ @Test
+ public void testDefaultValues() {
+ Configuration config = new Configuration();
+
+ String batchSize =
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE);
+ String scrollTimeout =
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT);
+
+ Assert.assertEquals(batchSize, (Object)
String.valueOf(ElasticsearchConstants.DEFAULT_BATCH_SIZE));
+ Assert.assertEquals(scrollTimeout, (Object)
ElasticsearchConstants.DEFAULT_SCROLL_TIMEOUT);
+ }
+
+ @Test
+ public void testTimeoutValues() {
+ Configuration config = new Configuration();
+
+ String connectionTimeout =
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT);
+ String socketTimeout =
config.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT);
+
+ Assert.assertEquals(connectionTimeout,
+ (Object)
String.valueOf(ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT));
+ Assert.assertEquals(socketTimeout,
+ (Object)
String.valueOf(ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT));
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
new file mode 100644
index 000000000..a5f594b13
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableConnectorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ElasticsearchTableConnectorTest {
+
+ private ElasticsearchTableConnector connector;
+ private Configuration config;
+ private TableSchema schema;
+
+ @BeforeMethod
+ public void setUp() {
+ connector = new ElasticsearchTableConnector();
+ config = new Configuration();
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX,
"test_index");
+
config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD,
"id");
+
+ TableField idField = new TableField("id", Types.INTEGER, false);
+ TableField nameField = new TableField("name", Types.STRING, false);
+ schema = new TableSchema(new StructType(Arrays.asList(idField,
nameField)));
+ }
+
+ @Test
+ public void testGetName() {
+ Assert.assertEquals(connector.getType(), "ELASTICSEARCH");
+ }
+
+ @Test
+ public void testGetSource() {
+ TableSource source = connector.createSource(config);
+ Assert.assertNotNull(source);
+ Assert.assertTrue(source instanceof ElasticsearchTableSource);
+ }
+
+ @Test
+ public void testGetSink() {
+ TableSink sink = connector.createSink(config);
+ Assert.assertNotNull(sink);
+ Assert.assertTrue(sink instanceof ElasticsearchTableSink);
+ }
+
+ @Test
+ public void testMultipleSourceInstances() {
+ TableSource source1 = connector.createSource(config);
+ TableSource source2 = connector.createSource(config);
+
+ Assert.assertNotNull(source1);
+ Assert.assertNotNull(source2);
+ Assert.assertNotSame(source1, source2);
+ }
+
+ @Test
+ public void testMultipleSinkInstances() {
+ TableSink sink1 = connector.createSink(config);
+ TableSink sink2 = connector.createSink(config);
+
+ Assert.assertNotNull(sink1);
+ Assert.assertNotNull(sink2);
+ Assert.assertNotSame(sink1, sink2);
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSinkTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSinkTest.java
new file mode 100644
index 000000000..942839522
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSinkTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ElasticsearchTableSinkTest {
+
+ private ElasticsearchTableSink sink;
+ private Configuration config;
+ private StructType schema;
+
+ @BeforeMethod
+ public void setUp() {
+ sink = new ElasticsearchTableSink();
+ config = new Configuration();
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX,
"test_index");
+
config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_DOCUMENT_ID_FIELD,
"id");
+
+ TableField idField = new TableField("id", Types.INTEGER, false);
+ TableField nameField = new TableField("name", Types.STRING, false);
+ TableField ageField = new TableField("age", Types.INTEGER, false);
+ schema = new StructType(Arrays.asList(idField, nameField, ageField));
+ }
+
+ @Test
+ public void testInit() {
+ sink.init(config, schema);
+ Assert.assertNotNull(sink);
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testInitWithoutIndex() {
+ Configuration invalidConfig = new Configuration();
+
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+ sink.init(invalidConfig, schema);
+ }
+
+ @Test
+ public void testInitWithoutIdField() {
+ Configuration invalidConfig = new Configuration();
+
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX,
"test_index");
+ sink.init(invalidConfig, schema);
+ Assert.assertNotNull(sink);
+ }
+
+ @Test
+ public void testBatchSizeConfiguration() {
+
config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_BATCH_SIZE, "500");
+ sink.init(config, schema);
+ Assert.assertNotNull(sink);
+ }
+
+ @Test
+ public void testWriteRow() {
+ sink.init(config, schema);
+
+ Row row = ObjectRow.create(1, "Alice", 25);
+ Assert.assertNotNull(row);
+ }
+
+ @Test
+ public void testMultipleWrites() {
+ sink.init(config, schema);
+
+ for (int i = 0; i < 10; i++) {
+ Row row = ObjectRow.create(i, "User" + i, 20 + i);
+ Assert.assertNotNull(row);
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSourceTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSourceTest.java
new file mode 100644
index 000000000..1a0a00443
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSourceTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.elasticsearch;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ElasticsearchTableSourceTest {
+
+ private ElasticsearchTableSource source;
+ private Configuration config;
+ private TableSchema schema;
+
+ @BeforeMethod
+ public void setUp() {
+ source = new ElasticsearchTableSource();
+ config = new Configuration();
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+ config.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX,
"test_index");
+
+ TableField idField = new TableField("id", Types.INTEGER, false);
+ TableField nameField = new TableField("name", Types.STRING, false);
+ schema = new TableSchema(new StructType(Arrays.asList(idField,
nameField)));
+ }
+
+ @Test
+ public void testInit() {
+ source.init(config, schema);
+ Assert.assertNotNull(source);
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testInitWithoutIndex() {
+ Configuration invalidConfig = new Configuration();
+
invalidConfig.put(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS,
"localhost:9200");
+ source.init(invalidConfig, schema);
+ }
+
+ @Test
+ public void testListPartitions() {
+ source.init(config, schema);
+ List<Partition> partitions = source.listPartitions();
+
+ Assert.assertNotNull(partitions);
+ Assert.assertEquals(partitions.size(), 1);
+ Assert.assertEquals(partitions.get(0).getName(), "test_index");
+ }
+
+ @Test
+ public void testGetDeserializer() {
+ source.init(config, schema);
+ Assert.assertNotNull(source.getDeserializer(config));
+ }
+
+ @Test
+ public void testPartitionName() {
+ ElasticsearchTableSource.ElasticsearchPartition partition =
+ new
ElasticsearchTableSource.ElasticsearchPartition("my_index");
+ Assert.assertEquals(partition.getName(), "my_index");
+ }
+
+ @Test
+ public void testOffsetHumanReadable() {
+ ElasticsearchTableSource.ElasticsearchOffset offset =
+ new ElasticsearchTableSource.ElasticsearchOffset("scroll_123");
+ Assert.assertTrue(offset.humanReadable().contains("scroll_123"));
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/pom.xml
new file mode 100644
index 000000000..4b1aa330f
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector</artifactId>
+ <version>0.6.8-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>geaflow-dsl-connector-neo4j</artifactId>
+
+ <properties>
+ <neo4j-java-driver.version>4.4.18</neo4j-java-driver.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j.driver</groupId>
+ <artifactId>neo4j-java-driver</artifactId>
+ <version>${neo4j-java-driver.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>${testng.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeys.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeys.java
new file mode 100644
index 000000000..6c755547d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeys.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_BATCH_SIZE;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_DATABASE;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_MAX_CONNECTION_POOL_SIZE;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_NODE_LABEL;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_RELATIONSHIP_TYPE;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class Neo4jConfigKeys {
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_URI = ConfigKeys
+ .key("geaflow.dsl.neo4j.uri")
+ .noDefaultValue()
+ .description("Neo4j database URI (e.g., bolt://localhost:7687).");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_USERNAME = ConfigKeys
+ .key("geaflow.dsl.neo4j.username")
+ .noDefaultValue()
+ .description("Neo4j database username.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_PASSWORD = ConfigKeys
+ .key("geaflow.dsl.neo4j.password")
+ .noDefaultValue()
+ .description("Neo4j database password.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_DATABASE = ConfigKeys
+ .key("geaflow.dsl.neo4j.database")
+ .defaultValue(DEFAULT_DATABASE)
+ .description("Neo4j database name.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_BATCH_SIZE = ConfigKeys
+ .key("geaflow.dsl.neo4j.batch.size")
+ .defaultValue(DEFAULT_BATCH_SIZE)
+ .description("Batch size for writing to Neo4j.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME =
ConfigKeys
+ .key("geaflow.dsl.neo4j.max.connection.lifetime.millis")
+ .defaultValue(DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS)
+ .description("Maximum lifetime of a connection in milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE =
ConfigKeys
+ .key("geaflow.dsl.neo4j.max.connection.pool.size")
+ .defaultValue(DEFAULT_MAX_CONNECTION_POOL_SIZE)
+ .description("Maximum size of the connection pool.");
+
+ public static final ConfigKey
GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT = ConfigKeys
+ .key("geaflow.dsl.neo4j.connection.acquisition.timeout.millis")
+ .defaultValue(DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS)
+ .description("Timeout for acquiring a connection from the pool in
milliseconds.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_QUERY = ConfigKeys
+ .key("geaflow.dsl.neo4j.query")
+ .noDefaultValue()
+ .description("Cypher query for reading data from Neo4j.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_NODE_LABEL = ConfigKeys
+ .key("geaflow.dsl.neo4j.node.label")
+ .defaultValue(DEFAULT_NODE_LABEL)
+ .description("Node label for writing nodes to Neo4j.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE =
ConfigKeys
+ .key("geaflow.dsl.neo4j.relationship.type")
+ .defaultValue(DEFAULT_RELATIONSHIP_TYPE)
+ .description("Relationship type for writing relationships to Neo4j.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_WRITE_MODE = ConfigKeys
+ .key("geaflow.dsl.neo4j.write.mode")
+ .defaultValue("node")
+ .description("Write mode: 'node' for writing nodes, 'relationship' for
writing relationships.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_NODE_ID_FIELD = ConfigKeys
+ .key("geaflow.dsl.neo4j.node.id.field")
+ .noDefaultValue()
+ .description("Field name to use as node ID.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD
= ConfigKeys
+ .key("geaflow.dsl.neo4j.relationship.source.field")
+ .noDefaultValue()
+ .description("Field name for relationship source node ID.");
+
+ public static final ConfigKey GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD
= ConfigKeys
+ .key("geaflow.dsl.neo4j.relationship.target.field")
+ .noDefaultValue()
+ .description("Field name for relationship target node ID.");
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConstants.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConstants.java
new file mode 100644
index 000000000..153b1f5c0
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+public class Neo4jConstants {
+
+ public static final String DEFAULT_DATABASE = "neo4j";
+
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+
+ public static final long DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS =
3600000L; // 1 hour
+
+ public static final int DEFAULT_MAX_CONNECTION_POOL_SIZE = 100;
+
+ public static final long DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS =
60000L; // 1 minute
+
+ public static final String DEFAULT_NODE_LABEL = "Node";
+
+ public static final String DEFAULT_RELATIONSHIP_LABEL = "relationship";
+
+ public static final String DEFAULT_RELATIONSHIP_TYPE = "RELATES_TO";
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnector.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnector.java
new file mode 100644
index 000000000..179298d23
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.TableWritableConnector;
+
+public class Neo4jTableConnector implements TableReadableConnector,
TableWritableConnector {
+
+ public static final String TYPE = "Neo4j";
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public TableSource createSource(Configuration conf) {
+ return new Neo4jTableSource();
+ }
+
+ @Override
+ public TableSink createSink(Configuration conf) {
+ return new Neo4jTableSink();
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java
new file mode 100644
index 000000000..239bc0015
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_NODE_LABEL;
+import static
org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_RELATIONSHIP_LABEL;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Neo4jTableSink implements TableSink {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Neo4jTableSink.class);
+
+ private StructType schema;
+ private String uri;
+ private String username;
+ private String password;
+ private String database;
+ private int batchSize;
+ private String writeMode;
+ private String nodeLabel;
+ private String relationshipType;
+ private String nodeIdField;
+ private String relationshipSourceField;
+ private String relationshipTargetField;
+ private long maxConnectionLifetime;
+ private int maxConnectionPoolSize;
+ private long connectionAcquisitionTimeout;
+
+ private Driver driver;
+ private Session session;
+ private Transaction transaction;
+ private List<Row> batch;
+
+ @Override
+ public void init(Configuration tableConf, StructType schema) {
+ LOGGER.info("Init Neo4j sink with config: {}, \n schema: {}",
tableConf, schema);
+ this.schema = schema;
+
+ this.uri = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI);
+ this.username =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME);
+ this.password =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD);
+ this.database =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE);
+ this.batchSize =
tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE);
+ this.writeMode =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE);
+ this.nodeLabel =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL);
+ this.relationshipType =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE);
+ this.nodeIdField =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_ID_FIELD);
+ this.relationshipSourceField =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD);
+ this.relationshipTargetField =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD);
+ this.maxConnectionLifetime =
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME);
+ this.maxConnectionPoolSize =
tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE);
+ this.connectionAcquisitionTimeout =
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT);
+
+ validateConfig();
+ this.batch = new ArrayList<>(batchSize);
+ }
+
+ private void validateConfig() {
+ if (uri == null || uri.isEmpty()) {
+ throw new GeaFlowDSLException("Neo4j URI must be specified");
+ }
+ if (username == null || username.isEmpty()) {
+ throw new GeaFlowDSLException("Neo4j username must be specified");
+ }
+ if (password == null || password.isEmpty()) {
+ throw new GeaFlowDSLException("Neo4j password must be specified");
+ }
+ if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) {
+ if (nodeIdField == null || nodeIdField.isEmpty()) {
+ throw new GeaFlowDSLException("Node ID field must be specified
for node write mode");
+ }
+ } else if (DEFAULT_RELATIONSHIP_LABEL.equals(writeMode)) {
+ if (relationshipSourceField == null ||
relationshipSourceField.isEmpty()
+ || relationshipTargetField == null ||
relationshipTargetField.isEmpty()) {
+ throw new GeaFlowDSLException("Relationship source and target
fields must be specified for relationship write mode");
+ }
+ } else {
+ throw new GeaFlowDSLException("Invalid write mode: " + writeMode +
". Must be 'node' or 'relationship'");
+ }
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ try {
+ Config config = Config.builder()
+ .withMaxConnectionLifetime(maxConnectionLifetime,
TimeUnit.MILLISECONDS)
+ .withMaxConnectionPoolSize(maxConnectionPoolSize)
+
.withConnectionAcquisitionTimeout(connectionAcquisitionTimeout,
TimeUnit.MILLISECONDS)
+ .build();
+
+ this.driver = GraphDatabase.driver(uri, AuthTokens.basic(username,
password), config);
+
+ SessionConfig sessionConfig = SessionConfig.builder()
+ .withDatabase(database)
+ .build();
+
+ this.session = driver.session(sessionConfig);
+ this.transaction = session.beginTransaction();
+
+ LOGGER.info("Neo4j connection established successfully");
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to connect to Neo4j: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void write(Row row) throws IOException {
+ batch.add(row);
+ if (batch.size() >= batchSize) {
+ flush();
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ if (!batch.isEmpty()) {
+ flush();
+ }
+ try {
+ if (transaction != null) {
+ transaction.commit();
+ transaction.close();
+ transaction = null;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to commit transaction", e);
+ try {
+ if (transaction != null) {
+ transaction.rollback();
+ }
+ } catch (Exception ex) {
+ throw new GeaFlowDSLException("Failed to rollback
transaction", ex);
+ }
+ throw new GeaFlowDSLException("Failed to finish writing to Neo4j",
e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (transaction != null) {
+ transaction.close();
+ transaction = null;
+ }
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (driver != null) {
+ driver.close();
+ driver = null;
+ }
+ LOGGER.info("Neo4j connection closed successfully");
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to close Neo4j connection",
e);
+ }
+ }
+
+ private void flush() {
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ try {
+ if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) {
+ writeNodes();
+ } else {
+ writeRelationships();
+ }
+ batch.clear();
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to flush batch to Neo4j", e);
+ }
+ }
+
+ private void writeNodes() {
+ List<String> fieldNames = schema.getFieldNames();
+ IType<?>[] types = schema.getTypes();
+
+ int nodeIdIndex = fieldNames.indexOf(nodeIdField);
+ if (nodeIdIndex == -1) {
+ throw new GeaFlowDSLException("Node ID field not found in schema:
" + nodeIdField);
+ }
+
+ for (Row row : batch) {
+ Map<String, Object> properties = new HashMap<>();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (i == nodeIdIndex) {
+ continue; // Skip ID field, it will be used as node ID
+ }
+ Object value = row.getField(i, types[i]);
+ if (value != null) {
+ properties.put(fieldNames.get(i), value);
+ }
+ }
+
+ Object nodeId = row.getField(nodeIdIndex, types[nodeIdIndex]);
+ if (nodeId == null) {
+ throw new GeaFlowDSLException("Node ID cannot be null");
+ }
+
+ String cypher = String.format(
+ "MERGE (n:%s {id: $id}) SET n += $properties",
+ nodeLabel
+ );
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("id", nodeId);
+ parameters.put("properties", properties);
+
+ transaction.run(cypher, parameters);
+ }
+ }
+
+ private void writeRelationships() {
+ List<String> fieldNames = schema.getFieldNames();
+ IType<?>[] types = schema.getTypes();
+
+ int sourceIndex = fieldNames.indexOf(relationshipSourceField);
+ int targetIndex = fieldNames.indexOf(relationshipTargetField);
+
+ if (sourceIndex == -1) {
+ throw new GeaFlowDSLException("Relationship source field not found
in schema: " + relationshipSourceField);
+ }
+ if (targetIndex == -1) {
+ throw new GeaFlowDSLException("Relationship target field not found
in schema: " + relationshipTargetField);
+ }
+
+ for (Row row : batch) {
+ Object sourceId = row.getField(sourceIndex, types[sourceIndex]);
+ Object targetId = row.getField(targetIndex, types[targetIndex]);
+
+ if (sourceId == null || targetId == null) {
+ throw new GeaFlowDSLException("Relationship source and target
IDs cannot be null");
+ }
+
+ Map<String, Object> properties = new HashMap<>();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (i == sourceIndex || i == targetIndex) {
+ continue; // Skip source and target fields
+ }
+ Object value = row.getField(i, types[i]);
+ if (value != null) {
+ properties.put(fieldNames.get(i), value);
+ }
+ }
+
+ final String cypher = String.format(
+ "MATCH (a {id: $sourceId}), (b {id: $targetId}) "
+ + "MERGE (a)-[r:%s]->(b) SET r += $properties",
+ relationshipType
+ );
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("sourceId", sourceId);
+ parameters.put("targetId", targetId);
+ parameters.put("properties", properties);
+
+ transaction.run(cypher, parameters);
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSource.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSource.java
new file mode 100644
index 000000000..019e3667d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSource.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.api.window.WindowType;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableSchema;
+import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
+import org.apache.geaflow.dsl.connector.api.Partition;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.apache.geaflow.dsl.connector.api.serde.DeserializerFactory;
+import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.FetchWindow;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Neo4jTableSource implements TableSource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Neo4jTableSource.class);
+
+ private Configuration tableConf;
+ private StructType schema;
+ private String uri;
+ private String username;
+ private String password;
+ private String database;
+ private String cypherQuery;
+ private long maxConnectionLifetime;
+ private int maxConnectionPoolSize;
+ private long connectionAcquisitionTimeout;
+
+ private Driver driver;
+ private Map<Partition, Session> partitionSessionMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public void init(Configuration tableConf, TableSchema tableSchema) {
+ LOGGER.info("Init Neo4j source with config: {}, \n schema: {}",
tableConf, tableSchema);
+ this.tableConf = tableConf;
+ this.schema = tableSchema;
+
+ this.uri = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI);
+ this.username =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME);
+ this.password =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD);
+ this.database =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE);
+ this.cypherQuery =
tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_QUERY);
+ this.maxConnectionLifetime =
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME);
+ this.maxConnectionPoolSize =
tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE);
+ this.connectionAcquisitionTimeout =
tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT);
+
+ if (cypherQuery == null || cypherQuery.isEmpty()) {
+ throw new GeaFlowDSLException("Neo4j query must be specified");
+ }
+ }
+
+ @Override
+ public void open(RuntimeContext context) {
+ try {
+ Config config = Config.builder()
+ .withMaxConnectionLifetime(maxConnectionLifetime,
TimeUnit.MILLISECONDS)
+ .withMaxConnectionPoolSize(maxConnectionPoolSize)
+
.withConnectionAcquisitionTimeout(connectionAcquisitionTimeout,
TimeUnit.MILLISECONDS)
+ .build();
+
+ this.driver = GraphDatabase.driver(uri, AuthTokens.basic(username,
password), config);
+ LOGGER.info("Neo4j driver created successfully");
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to create Neo4j driver: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public List<Partition> listPartitions() {
+ // Neo4j doesn't have native partitioning like JDBC
+ // For simplicity, we return a single partition
+ return Collections.singletonList(new Neo4jPartition(cypherQuery));
+ }
+
+ @Override
+ public List<Partition> listPartitions(int parallelism) {
+ return listPartitions();
+ }
+
+ @Override
+ public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
+ return DeserializerFactory.loadRowTableDeserializer();
+ }
+
+ @Override
+ public <T> FetchData<T> fetch(Partition partition, Optional<Offset>
startOffset,
+ FetchWindow windowInfo) throws IOException {
+ if (!(windowInfo.getType() == WindowType.SIZE_TUMBLING_WINDOW
+ || windowInfo.getType() == WindowType.ALL_WINDOW)) {
+ throw new GeaFlowDSLException("Not support window type: {}",
windowInfo.getType());
+ }
+
+ Neo4jPartition neo4jPartition = (Neo4jPartition) partition;
+ Session session = partitionSessionMap.get(partition);
+
+ if (session == null) {
+ SessionConfig sessionConfig = SessionConfig.builder()
+ .withDatabase(database)
+ .build();
+ session = driver.session(sessionConfig);
+ partitionSessionMap.put(partition, session);
+ }
+
+ long offset = startOffset.isPresent() ? startOffset.get().getOffset()
: 0;
+
+ List<Row> dataList = new ArrayList<>();
+ try {
+ String query = neo4jPartition.getQuery();
+ // Add SKIP and LIMIT to the query for pagination
+ String paginatedQuery = query + " SKIP $skip LIMIT $limit";
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("skip", offset);
+ parameters.put("limit", windowInfo.windowSize());
+
+ Result result = session.run(paginatedQuery, parameters);
+
+ List<String> fieldNames = schema.getFieldNames();
+
+ while (result.hasNext()) {
+ Record record = result.next();
+ Object[] values = new Object[fieldNames.size()];
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ if (record.containsKey(fieldName)) {
+ Value value = record.get(fieldName);
+ values[i] = convertNeo4jValue(value);
+ } else {
+ values[i] = null;
+ }
+ }
+
+ dataList.add(ObjectRow.create(values));
+ }
+
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to fetch data from Neo4j",
e);
+ }
+
+ Neo4jOffset nextOffset = new Neo4jOffset(offset + dataList.size());
+ boolean isFinish = windowInfo.getType() == WindowType.ALL_WINDOW
+ || dataList.size() < windowInfo.windowSize();
+
+ return (FetchData<T>) FetchData.createStreamFetch(dataList,
nextOffset, isFinish);
+ }
+
+ @Override
+ public void close() {
+ try {
+ for (Session session : partitionSessionMap.values()) {
+ if (session != null) {
+ session.close();
+ }
+ }
+ partitionSessionMap.clear();
+
+ if (driver != null) {
+ driver.close();
+ driver = null;
+ }
+ LOGGER.info("Neo4j connections closed successfully");
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Failed to close Neo4j connections",
e);
+ }
+ }
+
+ private Object convertNeo4jValue(Value value) {
+ if (value.isNull()) {
+ return null;
+ }
+
+ switch (value.type().name()) {
+ case "INTEGER":
+ return value.asLong();
+ case "FLOAT":
+ return value.asDouble();
+ case "STRING":
+ return value.asString();
+ case "BOOLEAN":
+ return value.asBoolean();
+ case "LIST":
+ return value.asList();
+ case "MAP":
+ return value.asMap();
+ case "NODE":
+ return value.asNode().asMap();
+ case "RELATIONSHIP":
+ return value.asRelationship().asMap();
+ case "PATH":
+ return value.asPath().toString();
+ default:
+ return value.asObject();
+ }
+ }
+
+ public static class Neo4jPartition implements Partition {
+
+ private final String query;
+
+ public Neo4jPartition(String query) {
+ this.query = query;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ @Override
+ public String getName() {
+ return "neo4j-partition-" + query.hashCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(query);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Neo4jPartition)) {
+ return false;
+ }
+ Neo4jPartition that = (Neo4jPartition) o;
+ return Objects.equals(query, that.query);
+ }
+ }
+
+ public static class Neo4jOffset implements Offset {
+
+ private final long offset;
+
+ public Neo4jOffset(long offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public String humanReadable() {
+ return String.valueOf(offset);
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public boolean isTimestamp() {
+ return false;
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
new file mode 100644
index 000000000..42a584079
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.geaflow.dsl.connector.neo4j.Neo4jTableConnector
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeysTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeysTest.java
new file mode 100644
index 000000000..c93dd546d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jConfigKeysTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class Neo4jConfigKeysTest {
+
+ @Test
+ public void testDefaultValues() {
+ Assert.assertEquals(Neo4jConstants.DEFAULT_DATABASE, "neo4j");
+ Assert.assertEquals(Neo4jConstants.DEFAULT_BATCH_SIZE, 1000);
+
Assert.assertEquals(Neo4jConstants.DEFAULT_MAX_CONNECTION_LIFETIME_MILLIS,
3600000L);
+ Assert.assertEquals(Neo4jConstants.DEFAULT_MAX_CONNECTION_POOL_SIZE,
100);
+
Assert.assertEquals(Neo4jConstants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_MILLIS,
60000L);
+ Assert.assertEquals(Neo4jConstants.DEFAULT_NODE_LABEL, "Node");
+ Assert.assertEquals(Neo4jConstants.DEFAULT_RELATIONSHIP_TYPE,
"RELATES_TO");
+ }
+
+ @Test
+ public void testConfigKeyNames() {
+ Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI.getKey(),
+ "geaflow.dsl.neo4j.uri");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME.getKey(),
+ "geaflow.dsl.neo4j.username");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD.getKey(),
+ "geaflow.dsl.neo4j.password");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE.getKey(),
+ "geaflow.dsl.neo4j.database");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE.getKey(),
+ "geaflow.dsl.neo4j.batch.size");
+ Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_QUERY.getKey(),
+ "geaflow.dsl.neo4j.query");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL.getKey(),
+ "geaflow.dsl.neo4j.node.label");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE.getKey(),
+ "geaflow.dsl.neo4j.relationship.type");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE.getKey(),
+ "geaflow.dsl.neo4j.write.mode");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_ID_FIELD.getKey(),
+ "geaflow.dsl.neo4j.node.id.field");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD.getKey(),
+ "geaflow.dsl.neo4j.relationship.source.field");
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD.getKey(),
+ "geaflow.dsl.neo4j.relationship.target.field");
+ }
+
+ @Test
+ public void testConfigKeyDefaults() {
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE.getDefaultValue(),
+ Neo4jConstants.DEFAULT_DATABASE);
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE.getDefaultValue(),
+ Neo4jConstants.DEFAULT_BATCH_SIZE);
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL.getDefaultValue(),
+ Neo4jConstants.DEFAULT_NODE_LABEL);
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE.getDefaultValue(),
+ Neo4jConstants.DEFAULT_RELATIONSHIP_TYPE);
+
Assert.assertEquals(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE.getDefaultValue(),
+ "node");
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnectorTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnectorTest.java
new file mode 100644
index 000000000..ed2f3dae3
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/test/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableConnectorTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.neo4j;
+
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.dsl.connector.api.TableSink;
+import org.apache.geaflow.dsl.connector.api.TableSource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class Neo4jTableConnectorTest {
+
+ private Neo4jTableConnector connector;
+ private Configuration config;
+
+ @BeforeMethod
+ public void setUp() {
+ connector = new Neo4jTableConnector();
+ config = new Configuration();
+ config.put(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI,
"bolt://localhost:7687");
+ config.put(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME, "neo4j");
+ config.put(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD, "password");
+ }
+
+ @Test
+ public void testGetType() {
+ Assert.assertEquals(connector.getType(), "Neo4j");
+ }
+
+ @Test
+ public void testCreateSource() {
+ TableSource source = connector.createSource(config);
+ Assert.assertNotNull(source);
+ Assert.assertTrue(source instanceof Neo4jTableSource);
+ }
+
+ @Test
+ public void testCreateSink() {
+ TableSink sink = connector.createSink(config);
+ Assert.assertNotNull(sink);
+ Assert.assertTrue(sink instanceof Neo4jTableSink);
+ }
+
+ @Test
+ public void testMultipleSourceInstances() {
+ TableSource source1 = connector.createSource(config);
+ TableSource source2 = connector.createSource(config);
+
+ Assert.assertNotNull(source1);
+ Assert.assertNotNull(source2);
+ Assert.assertNotSame(source1, source2);
+ }
+
+ @Test
+ public void testMultipleSinkInstances() {
+ TableSink sink1 = connector.createSink(config);
+ TableSink sink2 = connector.createSink(config);
+
+ Assert.assertNotNull(sink1);
+ Assert.assertNotNull(sink2);
+ Assert.assertNotSame(sink1, sink2);
+ }
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
index b7c9821a6..d45d4800d 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml
@@ -47,6 +47,8 @@
<module>geaflow-dsl-connector-pulsar</module>
<module>geaflow-dsl-connector-random</module>
<module>geaflow-dsl-connector-paimon</module>
+ <module>geaflow-dsl-connector-neo4j</module>
+ <module>geaflow-dsl-connector-elasticsearch</module>
</modules>
<dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]