This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 580276a8b [Improve][connector-V2-Neo4j]Supports neo4j sink batch write
and update docs (#4841)
580276a8b is described below
commit 580276a8bd05a0d7ec0293ec7aee54c967b17e86
Author: FuYouJ <[email protected]>
AuthorDate: Tue Jun 13 23:25:15 2023 +0800
[Improve][connector-V2-Neo4j]Supports neo4j sink batch write and update
docs (#4841)
---
docs/en/connector-v2/sink/Neo4j.md | 69 ++++++++++---
release-note.md | 1 +
.../seatunnel/neo4j/config/Neo4jQueryInfo.java | 107 +++++++++++++++++++++
.../seatunnel/neo4j/config/Neo4jSinkConfig.java | 13 +++
.../seatunnel/neo4j/config/Neo4jSinkQueryInfo.java | 82 +++++++++++++++-
.../neo4j/config/Neo4jSourceQueryInfo.java | 11 ++-
.../CypherEnum.java} | 18 ++--
.../SinkWriteMode.java} | 7 +-
.../Neo4jConnectorErrorCode.java} | 25 +++--
.../neo4j/internal/SeatunnelRowNeo4jValue.java | 54 +++++++++++
.../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 100 +------------------
.../seatunnel/neo4j/sink/Neo4jSinkWriter.java | 88 +++++++++++++++--
.../seatunnel/neo4j/source/Neo4jSource.java | 88 +----------------
.../seatunnel/e2e/connector/neo4j/Neo4jIT.java | 35 +++++++
.../resources/neo4j/fake_to_neo4j_batch_write.conf | 62 ++++++++++++
15 files changed, 541 insertions(+), 219 deletions(-)
diff --git a/docs/en/connector-v2/sink/Neo4j.md
b/docs/en/connector-v2/sink/Neo4j.md
index 8cfe35f7d..15e88646d 100644
--- a/docs/en/connector-v2/sink/Neo4j.md
+++ b/docs/en/connector-v2/sink/Neo4j.md
@@ -14,19 +14,21 @@ Write data to Neo4j.
## Options
-| name | type | required | default value |
-|----------------------------|--------|----------|---------------|
-| uri | String | Yes | - |
-| username | String | No | - |
-| password | String | No | - |
-| bearer_token | String | No | - |
-| kerberos_ticket | String | No | - |
-| database | String | Yes | - |
-| query | String | Yes | - |
-| queryParamPosition | Object | Yes | - |
-| max_transaction_retry_time | Long | No | 30 |
-| max_connection_timeout | Long | No | 30 |
-| common-options | config | no | - |
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------|
+| uri | String | Yes | - |
+| username | String | No | - |
+| password | String | No | - |
+| max_batch_size | Integer | No | - |
+| write_mode | String | No | OneByOne |
+| bearer_token | String | No | - |
+| kerberos_ticket | String | No | - |
+| database | String | Yes | - |
+| query | String | Yes | - |
+| queryParamPosition | Object | Yes | - |
+| max_transaction_retry_time | Long | No | 30 |
+| max_connection_timeout | Long | No | 30 |
+| common-options | config | no | - |
### uri [string]
@@ -40,6 +42,20 @@ username of the Neo4j
password of the Neo4j. required if `username` is provided
+### max_batch_size[Integer]
+
+max_batch_size refers to the maximum number of data entries that can be
written in a single transaction when writing to a database.
+
+### write_mode
+
+The default value is oneByOne, or set it to "Batch" if you want to have the
ability to write in batches
+
+```cypher
+unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age
+```
+
+"ttt" represents a batch of data.,"ttt" can be any arbitrary string as long as
it matches the configured "batch_data_variable".
+
### bearer_token [string]
base64 encoded bearer token of the Neo4j. for Auth.
@@ -76,7 +92,7 @@ The maximum amount of time to wait for a TCP connection to be
established (secon
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
-## Example
+## WriteOneByOneExample
```
sink {
@@ -98,9 +114,34 @@ sink {
}
```
+## WriteBatchExample
+> The unwind keyword provided by cypher supports batch writing, and the
default variable for a batch of data is batch. If you write a batch write
statement, then you should declare cypher:unwind $batch as row to do someting
+```
+sink {
+ Neo4j {
+ uri = "bolt://localhost:7687"
+ username = "neo4j"
+ password = "neo4j"
+ database = "neo4j"
+ max_batch_size = 1000
+ write_mode = "BATCH"
+
+ max_transaction_retry_time = 3
+ max_connection_timeout = 10
+
+ query = "unwind $batch as row create(n:MyLabel) set n.name =
row.name,n.age = row.age"
+
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
- Add Neo4j Sink Connector
+### issue ##4835
+
+- Sink supports batch write
+
diff --git a/release-note.md b/release-note.md
index 858e38c97..80f1479af 100644
--- a/release-note.md
+++ b/release-note.md
@@ -92,6 +92,7 @@
- [Connector-V2] [Doris] Add a jobId to the doris label to distinguish between
tasks (#4839) (#4853)
- [Connector-v2] [Mongodb]Refactor mongodb connector (#4620)
- [Connector-v2] [Jdbc] Populate primary key when jdbc sink is created using
CatalogTable (#4755)
+- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
- [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion
bug of transform (#4490)
### CI
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
index 4e56da64b..9730bfa11 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
@@ -17,9 +17,31 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+
+import org.neo4j.driver.AuthTokens;
+
import lombok.Data;
import java.io.Serializable;
+import java.net.URI;
+
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
/**
* Because Neo4jQueryInfo is one of the Neo4jSink's member variable, So
Neo4jQueryInfo need
@@ -29,4 +51,89 @@ import java.io.Serializable;
public abstract class Neo4jQueryInfo implements Serializable {
protected DriverBuilder driverBuilder;
protected String query;
+
+ protected PluginType pluginType;
+
+ public Neo4jQueryInfo(Config config, PluginType pluginType) {
+ this.pluginType = pluginType;
+ this.driverBuilder = prepareDriver(config, pluginType);
+ this.query = prepareQuery(config, pluginType);
+ }
+
+ // which is identical to the prepareDriver methods of the source and sink.
+ // the only difference is the pluginType mentioned in the error messages.
+ // so move code to here
+ protected DriverBuilder prepareDriver(Config config, PluginType
pluginType) {
+ final CheckResult uriConfigCheck =
+ CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(),
KEY_DATABASE.key());
+ final CheckResult authConfigCheck =
+ CheckConfigUtil.checkAtLeastOneExists(
+ config,
+ KEY_USERNAME.key(),
+ KEY_BEARER_TOKEN.key(),
+ KEY_KERBEROS_TICKET.key());
+ final CheckResult mergedConfigCheck =
+ CheckConfigUtil.mergeCheckResults(uriConfigCheck,
authConfigCheck);
+ if (!mergedConfigCheck.isSuccess()) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, pluginType,
mergedConfigCheck.getMsg()));
+ }
+
+ final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
+
+ final DriverBuilder driverBuilder = DriverBuilder.create(uri);
+
+ if (config.hasPath(KEY_USERNAME.key())) {
+ final CheckResult pwParamCheck =
+ CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
+ if (!pwParamCheck.isSuccess()) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, pluginType,
pwParamCheck.getMsg()));
+ }
+ final String username = config.getString(KEY_USERNAME.key());
+ final String password = config.getString(KEY_PASSWORD.key());
+
+ driverBuilder.setUsername(username);
+ driverBuilder.setPassword(password);
+ } else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
+ final String bearerToken =
config.getString(KEY_BEARER_TOKEN.key());
+ AuthTokens.bearer(bearerToken);
+ driverBuilder.setBearerToken(bearerToken);
+ } else {
+ final String kerberosTicket =
config.getString(KEY_KERBEROS_TICKET.key());
+ AuthTokens.kerberos(kerberosTicket);
+ driverBuilder.setBearerToken(kerberosTicket);
+ }
+
+ driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));
+
+ if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
+ driverBuilder.setMaxConnectionTimeoutSeconds(
+ config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
+ }
+ if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
+ driverBuilder.setMaxTransactionRetryTimeSeconds(
+ config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
+ }
+
+ return driverBuilder;
+ }
+
+ private String prepareQuery(Config config, PluginType pluginType) {
+ CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config,
KEY_QUERY.key());
+ if (!queryConfigCheck.isSuccess()) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, pluginType,
queryConfigCheck.getMsg()));
+ }
+ return config.getString(KEY_QUERY.key());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
index 74726b558..8d8c42b0e 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.neo4j.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
import java.util.Map;
@@ -29,4 +30,16 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
.noDefaultValue()
.withDescription(
"position mapping information for query
parameters. key name is parameter placeholder name. associated value is
position of field in input data row.");
+
+ public static final Option<Integer> MAX_BATCH_SIZE =
+ Options.key("max_batch_size")
+ .intType()
+ .defaultValue(500)
+ .withDescription("neo4j write max batch size");
+ public static final Option<SinkWriteMode> WRITE_MODE =
+ Options.key("write_mode")
+ .enumType(SinkWriteMode.class)
+ .defaultValue(SinkWriteMode.ONE_BY_ONE)
+ .withDescription(
+ "The write mode on the sink end is oneByOne by
default in order to maintain compatibility with previous code.");
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
index 44c0c7294..997f7cc8c 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
@@ -17,11 +17,89 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
-import lombok.Data;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+
+import lombok.Getter;
+import lombok.Setter;
import java.util.Map;
-@Data
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.MAX_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.WRITE_MODE;
+
+@Getter
+@Setter
public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {
+
private Map<String, Object> queryParamPosition;
+ private Integer maxBatchSize;
+
+ private SinkWriteMode writeMode;
+
+ public boolean batchMode() {
+ return SinkWriteMode.BATCH.equals(writeMode);
+ }
+
+ public Neo4jSinkQueryInfo(Config config) {
+ super(config, PluginType.SINK);
+
+ this.writeMode = prepareWriteMode(config);
+
+ if (SinkWriteMode.BATCH.equals(writeMode)) {
+ prepareBatchWriteConfig(config);
+ } else {
+ prepareOneByOneConfig(config);
+ }
+ }
+
+ private void prepareOneByOneConfig(Config config) {
+
+ CheckResult queryConfigCheck =
+ CheckConfigUtil.checkAllExists(config,
QUERY_PARAM_POSITION.key());
+
+ if (!queryConfigCheck.isSuccess()) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK,
queryConfigCheck.getMsg()));
+ }
+
+ // set queryParamPosition
+ this.queryParamPosition =
config.getObject(QUERY_PARAM_POSITION.key()).unwrapped();
+ }
+
+ private void prepareBatchWriteConfig(Config config) {
+
+ // batch size
+ if (config.hasPath(MAX_BATCH_SIZE.key())) {
+ int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+ if (batchSize <= 0) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK, "maxBatchSize
must greater than 0"));
+ }
+ this.maxBatchSize = batchSize;
+ } else {
+ this.maxBatchSize = MAX_BATCH_SIZE.defaultValue();
+ }
+ }
+
+ private SinkWriteMode prepareWriteMode(Config config) {
+ if (config.hasPath(WRITE_MODE.key())) {
+ return config.getEnum(SinkWriteMode.class, WRITE_MODE.key());
+ }
+ return WRITE_MODE.defaultValue();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
index 8288ad01a..cd98f54f9 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
@@ -17,4 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
-public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {}
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.common.constants.PluginType;
+
+public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {
+
+ public Neo4jSourceQueryInfo(Config pluginConfig) {
+ super(pluginConfig, PluginType.SOURCE);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/CypherEnum.java
similarity index 65%
copy from
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
copy to
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/CypherEnum.java
index 44c0c7294..01c3cf4ce 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/CypherEnum.java
@@ -15,13 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+package org.apache.seatunnel.connectors.seatunnel.neo4j.constants;
-import lombok.Data;
+public enum CypherEnum {
+ BATCH("batch", "a variable in cypher that represents a batch of data");
+ private final String value;
+ private final String description;
-import java.util.Map;
+ CypherEnum(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
-@Data
-public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {
- private Map<String, Object> queryParamPosition;
+ public String getValue() {
+ return value;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/SinkWriteMode.java
similarity index 86%
copy from
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
copy to
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/SinkWriteMode.java
index 8288ad01a..4975fb113 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/SinkWriteMode.java
@@ -15,6 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+package org.apache.seatunnel.connectors.seatunnel.neo4j.constants;
-public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {}
+public enum SinkWriteMode {
+ ONE_BY_ONE,
+ BATCH
+}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorErrorCode.java
similarity index 56%
copy from
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
copy to
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorErrorCode.java
index 44c0c7294..93f83131e 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorErrorCode.java
@@ -14,14 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.connectors.seatunnel.neo4j.exception;
-package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-import lombok.Data;
+public enum Neo4jConnectorErrorCode implements SeaTunnelErrorCode {
+ DATE_BASE_ERROR("NEO4J-01", "Neo4j Database Error");
+ private final String code;
+ private final String description;
-import java.util.Map;
+ Neo4jConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
-@Data
-public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {
- private Map<String, Object> queryParamPosition;
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/internal/SeatunnelRowNeo4jValue.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/internal/SeatunnelRowNeo4jValue.java
new file mode 100644
index 000000000..667239ea2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/internal/SeatunnelRowNeo4jValue.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.neo4j.internal;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.neo4j.driver.Value;
+import org.neo4j.driver.Values;
+import org.neo4j.driver.internal.AsValue;
+import org.neo4j.driver.internal.util.Iterables;
+import org.neo4j.driver.internal.value.MapValue;
+
+import java.util.Map;
+
+/**
+ * This class includes the seatunnelRow and implements the
neo4j.driver.internal.AsValue interface.
+ * This class will be able to convert to neo4j.driver.Value quickly without
any extra effort.
+ */
+public class SeatunnelRowNeo4jValue implements AsValue {
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRow seaTunnelRow;
+
+ public SeatunnelRowNeo4jValue(SeaTunnelRowType seaTunnelRowType,
SeaTunnelRow seaTunnelRow) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRow = seaTunnelRow;
+ }
+
+ @Override
+ public Value asValue() {
+ int length = seaTunnelRowType.getTotalFields();
+ Map<String, Value> valueMap = Iterables.newHashMapWithSize(length);
+ for (int i = 0; i < length; i++) {
+ String name = seaTunnelRowType.getFieldName(i);
+ Value value = Values.value(seaTunnelRow.getField(i));
+ valueMap.put(name, value);
+ }
+ return new MapValue(valueMap);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index adba115ef..4ab0070d7 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -20,43 +20,24 @@ package
org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
-import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
-
-import org.neo4j.driver.AuthTokens;
import com.google.auto.service.AutoService;
import java.io.IOException;
-import java.net.URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_BEARER_TOKEN;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_KERBEROS_TICKET;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_NEO4J_URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_QUERY;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_USERNAME;
import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
@AutoService(SeaTunnelSink.class)
public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void,
Void> {
private SeaTunnelRowType rowType;
- private final Neo4jSinkQueryInfo neo4JSinkQueryInfo = new
Neo4jSinkQueryInfo();
+ private Neo4jSinkQueryInfo neo4JSinkQueryInfo;
@Override
public String getPluginName() {
@@ -65,82 +46,7 @@ public class Neo4jSink implements
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
@Override
public void prepare(Config config) throws PrepareFailException {
- neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
-
- final CheckResult queryConfigCheck =
- CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(),
QUERY_PARAM_POSITION.key());
- if (!queryConfigCheck.isSuccess()) {
- throw new Neo4jConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- PLUGIN_NAME, PluginType.SINK,
queryConfigCheck.getMsg()));
- }
- neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
- neo4JSinkQueryInfo.setQueryParamPosition(
- config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
- }
-
- private DriverBuilder prepareDriver(Config config) {
- final CheckResult uriConfigCheck =
- CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(),
KEY_DATABASE.key());
- final CheckResult authConfigCheck =
- CheckConfigUtil.checkAtLeastOneExists(
- config,
- KEY_USERNAME.key(),
- KEY_BEARER_TOKEN.key(),
- KEY_KERBEROS_TICKET.key());
- final CheckResult mergedConfigCheck =
- CheckConfigUtil.mergeCheckResults(uriConfigCheck,
authConfigCheck);
- if (!mergedConfigCheck.isSuccess()) {
- throw new Neo4jConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- PLUGIN_NAME, PluginType.SINK,
mergedConfigCheck.getMsg()));
- }
-
- final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
-
- final DriverBuilder driverBuilder = DriverBuilder.create(uri);
-
- if (config.hasPath(KEY_USERNAME.key())) {
- final CheckResult pwParamCheck =
- CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
- if (!pwParamCheck.isSuccess()) {
- throw new Neo4jConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- PLUGIN_NAME, PluginType.SINK,
pwParamCheck.getMsg()));
- }
- final String username = config.getString(KEY_USERNAME.key());
- final String password = config.getString(KEY_PASSWORD.key());
-
- driverBuilder.setUsername(username);
- driverBuilder.setPassword(password);
- } else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
- final String bearerToken =
config.getString(KEY_BEARER_TOKEN.key());
- AuthTokens.bearer(bearerToken);
- driverBuilder.setBearerToken(bearerToken);
- } else {
- final String kerberosTicket =
config.getString(KEY_KERBEROS_TICKET.key());
- AuthTokens.kerberos(kerberosTicket);
- driverBuilder.setBearerToken(kerberosTicket);
- }
-
- driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));
-
- if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
- driverBuilder.setMaxConnectionTimeoutSeconds(
- config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
- }
- if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
- driverBuilder.setMaxTransactionRetryTimeSeconds(
- config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
- }
-
- return driverBuilder;
+ this.neo4JSinkQueryInfo = new Neo4jSinkQueryInfo(config);
}
@Override
@@ -156,6 +62,6 @@ public class Neo4jSink implements
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
@Override
public SinkWriter<SeaTunnelRow, Void, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new Neo4jSinkWriter(neo4JSinkQueryInfo);
+ return new Neo4jSinkWriter(neo4JSinkQueryInfo, rowType);
}
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
index 528fe38a7..3cc6e82bd 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -17,22 +17,37 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.CypherEnum;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.internal.SeatunnelRowNeo4jValue;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.Values;
+import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.exceptions.Neo4jException;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+
@Slf4j
public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
@@ -40,17 +55,33 @@ public class Neo4jSinkWriter implements
SinkWriter<SeaTunnelRow, Void, Void> {
private final transient Driver driver;
private final transient Session session;
- public Neo4jSinkWriter(Neo4jSinkQueryInfo neo4jSinkQueryInfo) {
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final List<SeatunnelRowNeo4jValue> writeBuffer;
+ private final Integer maxBatchSize;
+
+ public Neo4jSinkWriter(
+ Neo4jSinkQueryInfo neo4jSinkQueryInfo, SeaTunnelRowType
seaTunnelRowType) {
this.neo4jSinkQueryInfo = neo4jSinkQueryInfo;
this.driver = this.neo4jSinkQueryInfo.getDriverBuilder().build();
this.session =
driver.session(
SessionConfig.forDatabase(
neo4jSinkQueryInfo.getDriverBuilder().getDatabase()));
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.maxBatchSize =
Optional.ofNullable(neo4jSinkQueryInfo.getMaxBatchSize()).orElse(0);
+ this.writeBuffer = new ArrayList<>(maxBatchSize);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (neo4jSinkQueryInfo.batchMode()) {
+ writeByBatchSize(element);
+ } else {
+ writeOneByOne(element);
+ }
+ }
+
+ private void writeOneByOne(SeaTunnelRow element) {
final Map<String, Object> queryParamPosition =
neo4jSinkQueryInfo.getQueryParamPosition().entrySet().stream()
.collect(
@@ -58,11 +89,47 @@ public class Neo4jSinkWriter implements
SinkWriter<SeaTunnelRow, Void, Void> {
Map.Entry::getKey,
e -> element.getField((Integer)
e.getValue())));
final Query query = new Query(neo4jSinkQueryInfo.getQuery(),
queryParamPosition);
- session.writeTransaction(
- tx -> {
- tx.run(query);
- return null;
- });
+ writeByQuery(query);
+ }
+
+ private void writeByBatchSize(SeaTunnelRow element) {
+ writeBuffer.add(new SeatunnelRowNeo4jValue(seaTunnelRowType, element));
+ tryWriteByBatchSize();
+ }
+
+ private void tryWriteByBatchSize() {
+ if (!writeBuffer.isEmpty() && writeBuffer.size() >= maxBatchSize) {
+ Query query = batchQuery();
+ writeByQuery(query);
+ writeBuffer.clear();
+ }
+ }
+
+ private Query batchQuery() {
+ try {
+ Value batchValues = Values.parameters(CypherEnum.BATCH.getValue(),
writeBuffer);
+ return new Query(neo4jSinkQueryInfo.getQuery(), batchValues);
+ } catch (ClientException e) {
+ log.error("Failed to build cypher statement", e);
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK, e.getMessage()));
+ }
+ }
+
+ private void writeByQuery(Query query) {
+ try {
+ session.writeTransaction(
+ tx -> {
+ tx.run(query);
+ return null;
+ });
+ } catch (Neo4jException e) {
+ throw new Neo4jConnectorException(
+ Neo4jConnectorErrorCode.DATE_BASE_ERROR, e.getMessage());
+ }
}
@Override
@@ -75,7 +142,16 @@ public class Neo4jSinkWriter implements
SinkWriter<SeaTunnelRow, Void, Void> {
@Override
public void close() throws IOException {
+ flushWriteBuffer();
session.close();
driver.close();
}
+
+ private void flushWriteBuffer() {
+ if (!writeBuffer.isEmpty()) {
+ Query query = batchQuery();
+ writeByQuery(query);
+ writeBuffer.clear();
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index 2e5f88705..12100afe2 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -34,33 +34,19 @@ import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
-import org.neo4j.driver.AuthTokens;
-
import com.google.auto.service.AutoService;
-import java.net.URI;
-
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_BEARER_TOKEN;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_KERBEROS_TICKET;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_NEO4J_URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_QUERY;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_USERNAME;
import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
@AutoService(SeaTunnelSource.class)
public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
- private final Neo4jSourceQueryInfo neo4jSourceQueryInfo = new
Neo4jSourceQueryInfo();
+ private Neo4jSourceQueryInfo neo4jSourceQueryInfo;
private SeaTunnelRowType rowType;
@Override
@@ -70,11 +56,9 @@ public class Neo4jSource extends
AbstractSingleSplitSource<SeaTunnelRow>
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- neo4jSourceQueryInfo.setDriverBuilder(prepareDriver(pluginConfig));
final CheckResult configCheck =
- CheckConfigUtil.checkAllExists(
- pluginConfig, KEY_QUERY.key(),
CatalogTableUtil.SCHEMA.key());
+ CheckConfigUtil.checkAllExists(pluginConfig,
CatalogTableUtil.SCHEMA.key());
if (!configCheck.isSuccess()) {
throw new Neo4jConnectorException(
@@ -85,8 +69,8 @@ public class Neo4jSource extends
AbstractSingleSplitSource<SeaTunnelRow>
PluginType.SOURCE,
configCheck.getMsg()));
}
- neo4jSourceQueryInfo.setQuery(pluginConfig.getString(KEY_QUERY.key()));
+ this.neo4jSourceQueryInfo = new Neo4jSourceQueryInfo(pluginConfig);
this.rowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
}
@@ -105,70 +89,4 @@ public class Neo4jSource extends
AbstractSingleSplitSource<SeaTunnelRow>
SingleSplitReaderContext readerContext) throws Exception {
return new Neo4jSourceReader(readerContext, neo4jSourceQueryInfo,
rowType);
}
-
- private DriverBuilder prepareDriver(Config config) {
- final CheckResult uriConfigCheck =
- CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(),
KEY_DATABASE.key());
- final CheckResult authConfigCheck =
- CheckConfigUtil.checkAtLeastOneExists(
- config,
- KEY_USERNAME.key(),
- KEY_BEARER_TOKEN.key(),
- KEY_KERBEROS_TICKET.key());
- final CheckResult mergedConfigCheck =
- CheckConfigUtil.mergeCheckResults(uriConfigCheck,
authConfigCheck);
- if (!mergedConfigCheck.isSuccess()) {
- throw new Neo4jConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- Neo4jSourceConfig.PLUGIN_NAME,
- PluginType.SOURCE,
- mergedConfigCheck.getMsg()));
- }
-
- final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
-
- final DriverBuilder driverBuilder = DriverBuilder.create(uri);
-
- if (config.hasPath(KEY_USERNAME.key())) {
- final CheckResult pwParamCheck =
- CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
- if (!pwParamCheck.isSuccess()) {
- throw new Neo4jConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- Neo4jSourceConfig.PLUGIN_NAME,
- PluginType.SOURCE,
- pwParamCheck.getMsg()));
- }
- final String username = config.getString(KEY_USERNAME.key());
- final String password = config.getString(KEY_PASSWORD.key());
-
- driverBuilder.setUsername(username);
- driverBuilder.setPassword(password);
- } else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
- final String bearerToken =
config.getString(KEY_BEARER_TOKEN.key());
- AuthTokens.bearer(bearerToken);
- driverBuilder.setBearerToken(bearerToken);
- } else {
- final String kerberosTicket =
config.getString(KEY_KERBEROS_TICKET.key());
- AuthTokens.kerberos(kerberosTicket);
- driverBuilder.setBearerToken(kerberosTicket);
- }
-
- driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));
-
- if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
- driverBuilder.setMaxConnectionTimeoutSeconds(
- config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
- }
- if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
- driverBuilder.setMaxTransactionRetryTimeSeconds(
- config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
- }
-
- return driverBuilder;
- }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
index a99d79c46..d70b550d4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.TestTemplate;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
@@ -54,12 +55,15 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.neo4j.driver.Values.parameters;
@Slf4j
public class Neo4jIT extends TestSuiteBase implements TestResource {
+ private static final int FAKE_ROW_NUM = 1000;
+
private static final String CONTAINER_IMAGE = "neo4j:5.6.0";
private static final String CONTAINER_HOST = "neo4j-host";
private static final int HTTP_PORT = 7474;
@@ -153,6 +157,37 @@ public class Neo4jIT extends TestSuiteBase implements
TestResource {
assertEquals(Float.MAX_VALUE, tt.get("float").asFloat());
}
+ @TestTemplate
+ public void testBatchWrite(TestContainer container) throws IOException,
InterruptedException {
+ // clean test data before test
+ final Result checkExists = neo4jSession.run("MATCH (n:BatchLabel)
RETURN n limit 1");
+ if (checkExists.hasNext()) {
+ neo4jSession.run("MATCH (n:BatchLabel) delete n");
+ }
+
+ // unwind $batch as row create(n:BatchLabel) set n.name =
row.name,n.age = row.age
+ Container.ExecResult execResult =
+ container.executeJob("/neo4j/fake_to_neo4j_batch_write.conf");
+ // then
+ Assertions.assertEquals(0, execResult.getExitCode());
+ final Result result = neo4jSession.run("MATCH (n:BatchLabel) RETURN
n");
+ // nodes
+ assertTrue(result.hasNext());
+ int cnt = 0;
+ // verify the attributes of the node
+ while (result.hasNext()) {
+ // don`t remove import org.neo4j.driver.Record;This can cause code
not to compile in
+ // java14+
+ Record r = result.next();
+ String name = r.get("n").get("name").asString();
+ assertNotNull(name);
+ Object age = r.get("n").get("age").asObject();
+ assertNotNull(age);
+ cnt++;
+ }
+ assertEquals(FAKE_ROW_NUM, cnt);
+ }
+
@AfterAll
@Override
public void tearDown() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
new file mode 100644
index 000000000..e1d9fed6f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 1
+ row.num = 1000
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Neo4j {
+ uri = "neo4j://neo4j-host:7687"
+ username = "neo4j"
+ password = "Test@12343"
+ database = "neo4j"
+ # Set it to 101 for testing code only.
+ max_batch_size = 101
+ write_mode = "BATCH"
+
+ max_transaction_retry_time = 3
+ max_connection_timeout = 1
+
+ query = "unwind $batch as row create(n:BatchLabel) set n.name =
row.name,n.age = row.age"
+ }
+}
\ No newline at end of file