This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d7fa9afdfe [Feature][Connector] Add druid sink connector (#6346)
d7fa9afdfe is described below
commit d7fa9afdfe95633f73fb66c877f954b8a29f8579
Author: TaoZex <[email protected]>
AuthorDate: Wed Jun 19 14:29:42 2024 +0800
[Feature][Connector] Add druid sink connector (#6346)
---
config/plugin_config | 1 +
docs/en/connector-v2/sink/Druid.md | 67 ++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2/connector-druid/pom.xml | 57 +++++
.../connectors/druid/config/DruidConfig.java | 44 ++++
.../druid/exception/DruidConnectorException.java | 38 +++
.../seatunnel/connectors/druid/sink/DruidSink.java | 61 +++++
.../connectors/druid/sink/DruidSinkFactory.java | 50 ++++
.../connectors/druid/sink/DruidWriter.java | 255 +++++++++++++++++++++
.../seatunnel/druid/DruidFactoryTest.java | 31 +++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-druid-e2e/pom.xml | 54 +++++
.../seatunnel/e2e/connector/druid/DruidIT.java | 148 ++++++++++++
.../src/test/resources/docker-compose.yml | 141 ++++++++++++
.../src/test/resources/environment | 53 +++++
.../src/test/resources/fakesource_to_druid.conf | 69 ++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
18 files changed, 1078 insertions(+)
diff --git a/config/plugin_config b/config/plugin_config
index 42fc280a65..b18d893e67 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -67,6 +67,7 @@ connector-openmldb
connector-pulsar
connector-rabbitmq
connector-redis
+connector-druid
connector-s3-redshift
connector-sentry
connector-slack
diff --git a/docs/en/connector-v2/sink/Druid.md
b/docs/en/connector-v2/sink/Druid.md
new file mode 100644
index 0000000000..0d4783b03a
--- /dev/null
+++ b/docs/en/connector-v2/sink/Druid.md
@@ -0,0 +1,67 @@
+# Druid
+
+> Druid sink connector
+
+## Description
+
+Write data to Druid
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+| SeaTunnel Data Type | Druid Data Type |
+|---------------------|-----------------|
+| TINYINT | LONG |
+| SMALLINT | LONG |
+| INT | LONG |
+| BIGINT | LONG |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DECIMAL | DOUBLE |
+| STRING | STRING |
+| BOOLEAN | STRING |
+| TIMESTAMP | STRING |
+
+## Options
+
+| name | type | required | default value |
+|----------------|--------|----------|---------------|
+| coordinatorUrl | string | yes | - |
+| datasource | string | yes | - |
+| batchSize | int | no | 10000 |
+| common-options | | no | - |
+
+### coordinatorUrl [string]
+
+The coordinatorUrl host and port of Druid, example: "myHost:8888"
+
+### datasource [string]
+
+The datasource name you want to write, example: "seatunnel"
+
+### batchSize [int]
+
+The number of rows flushed to Druid per batch. Default value is `1024`.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
+
+## Example
+
+```hocon
+Druid {
+ coordinatorUrl = "testHost:8888"
+ datasource = "seatunnel"
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Druid sink connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 411e42b880..25dc239f99 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -119,6 +119,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
seatunnel.sink.AmazonSqs = connector-amazonsqs
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
+seatunnel.sink.Druid = connector-druid
seatunnel.source.Easysearch = connector-easysearch
seatunnel.sink.Easysearch = connector-easysearch
seatunnel.source.Postgres-CDC = connector-cdc-postgres
diff --git a/seatunnel-connectors-v2/connector-druid/pom.xml
b/seatunnel-connectors-v2/connector-druid/pom.xml
new file mode 100644
index 0000000000..c53ca12f8f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-druid/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-druid</artifactId>
+ <name>SeaTunnel : Connectors V2 : Druid</name>
+
+ <properties>
+ <druid.version>24.0.1</druid.version>
+ <httpclient.version>4.5.13</httpclient.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-indexing-service</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java
new file mode 100644
index 0000000000..a2883143a3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class DruidConfig {
+ public static final Integer BATCH_SIZE_DEFAULT = 10000;
+
+ public static Option<String> COORDINATOR_URL =
+ Options.key("coordinatorUrl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The coordinatorUrl host and port of
Druid.");
+
+ public static Option<String> DATASOURCE =
+ Options.key("datasource")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The datasource name need to write.");
+
+ public static Option<Integer> BATCH_SIZE =
+ Options.key("batchSize")
+ .intType()
+ .defaultValue(BATCH_SIZE_DEFAULT)
+ .withDescription("The batch size of the druid write.");
+}
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java
new file mode 100644
index 0000000000..23c7348f7b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class DruidConnectorException extends SeaTunnelRuntimeException {
+
+ public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public DruidConnectorException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage,
Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
new file mode 100644
index 0000000000..318f3a1bd0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import java.io.IOException;
+
+import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
+import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
+
+public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private ReadonlyConfig config;
+ private CatalogTable catalogTable;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public String getPluginName() {
+ return "Druid";
+ }
+
+ public DruidSink(ReadonlyConfig config, CatalogTable table) {
+ this.config = config;
+ this.catalogTable = table;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
+ throws IOException {
+ return new DruidWriter(
+ seaTunnelRowType,
+ config.get(COORDINATOR_URL),
+ config.get(DATASOURCE),
+ config.get(BATCH_SIZE));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
new file mode 100644
index 0000000000..44e887810e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
+import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
+
+@AutoService(Factory.class)
+public class DruidSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Druid";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(COORDINATOR_URL,
DATASOURCE).build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new DruidSink(context.getOptions(), catalogTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
new file mode 100644
index 0000000000..3f7709b51d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.druid.exception.DruidConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DruidWriter.class);
+
+ private static final String DEFAULT_LINE_DELIMITER = "\n";
+ private static final String DEFAULT_FIELD_DELIMITER = ",";
+ private static final String TIMESTAMP_SPEC_COLUMN_NAME = "timestamp";
+ private static final String DRUID_ENDPOINT = "/druid/indexer/v1/task";
+
+ private int batchSize;
+ private int currentBatchSize = 0;
+
+ private final DataSchema dataSchema;
+
+ private final long processTime;
+ private final transient StringBuffer data;
+
+ private final CloseableHttpClient httpClient;
+ private final ObjectMapper mapper;
+ private final String coordinatorUrl;
+ private final String datasource;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public DruidWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ String coordinatorUrl,
+ String datasource,
+ int batchSize) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.coordinatorUrl = coordinatorUrl;
+ this.datasource = datasource;
+ this.batchSize = batchSize;
+ this.mapper = provideDruidSerializer();
+ this.httpClient = HttpClients.createDefault();
+ this.dataSchema = provideDruidDataSchema();
+ this.processTime = System.currentTimeMillis();
+ this.data = new StringBuffer();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER,
"", "");
+ for (int i = 0; i < element.getArity(); i++) {
+ final Object v = element.getField(i);
+ if (v != null) {
+ joiner.add(v.toString());
+ }
+ }
+ // timestamp column is a required field to add in Druid.
+ // See
https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp
+ joiner.add(String.valueOf(processTime));
+ data.append(joiner);
+ data.append(DEFAULT_LINE_DELIMITER);
+ currentBatchSize++;
+ if (currentBatchSize >= batchSize) {
+ flush();
+ currentBatchSize = 0;
+ }
+ }
+
+ public void flush() throws IOException {
+ final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);
+ final ParallelIndexSupervisorTask indexTask =
provideIndexTask(ioConfig);
+ final String inputJSON = provideInputJSONString(indexTask);
+ String uri = new String("http://" + this.coordinatorUrl +
DRUID_ENDPOINT);
+ HttpPost post = new HttpPost(uri);
+ post.setHeader("Content-Type", "application/json");
+ post.setHeader("Accept", "application/json, text/plain, */*");
+ post.setEntity(new StringEntity(inputJSON));
+
+ try (CloseableHttpResponse response = httpClient.execute(post)) {
+ String responseBody =
+ response.getEntity() != null ?
response.getEntity().toString() : "";
+ LOG.info("Druid write task has been sent, and the response is {}",
responseBody);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ }
+
+ private ObjectMapper provideDruidSerializer() {
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.registerModule(new JodaModule());
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+ mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ return mapper;
+ }
+
+ /**
+ * One necessary information to provide is DimensionSchema list, which
states data type of
+ * columns. More details in
https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html
+ */
+ private DataSchema provideDruidDataSchema() {
+ final List<DimensionSchema> dimensionSchemas =
transformToDimensionSchema();
+ return new DataSchema(
+ datasource,
+ new TimestampSpec(TIMESTAMP_SPEC_COLUMN_NAME, "auto", null),
+ new DimensionsSpec(dimensionSchemas),
+ null,
+ new UniformGranularitySpec(Granularities.HOUR,
Granularities.MINUTE, false, null),
+ null);
+ }
+
+ private List<DimensionSchema> transformToDimensionSchema() {
+ List<DimensionSchema> dimensionSchemas = new ArrayList<>();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ for (int i = 0; i < fieldNames.length; i++) {
+ String columnName = fieldNames[i];
+ switch (fieldTypes[i].getSqlType()) {
+ case BOOLEAN:
+ case TIMESTAMP:
+ case STRING:
+ dimensionSchemas.add(new
StringDimensionSchema(columnName));
+ break;
+ case FLOAT:
+ dimensionSchemas.add(new FloatDimensionSchema(columnName));
+ break;
+ case DECIMAL:
+ case DOUBLE:
+ dimensionSchemas.add(new
DoubleDimensionSchema(columnName));
+ break;
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ dimensionSchemas.add(new LongDimensionSchema(columnName));
+ break;
+ default:
+ throw new DruidConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unsupported data type " +
seaTunnelRowType.getFieldType(i));
+ }
+ }
+ return dimensionSchemas;
+ }
+
+ ParallelIndexIOConfig provideDruidIOConfig(final StringBuffer data) {
+ List<String> formatList =
+
Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
+ formatList.add(TIMESTAMP_SPEC_COLUMN_NAME);
+ return new ParallelIndexIOConfig(
+ null,
+ new InlineInputSource(data.toString()),
+ new CsvInputFormat(formatList, DEFAULT_LINE_DELIMITER, null,
false, 0),
+ false,
+ null);
+ }
+
+ /**
+ * Provide ParallelIndexSupervisorTask that can run multiple indexing
tasks concurrently. See
+ * more information in
https://druid.apache.org/docs/latest/ingestion/native-batch.html
+ */
+ @VisibleForTesting
+ ParallelIndexSupervisorTask provideIndexTask(final ParallelIndexIOConfig
ioConfig) {
+ return new ParallelIndexSupervisorTask(
+ null, null, null, new ParallelIndexIngestionSpec(dataSchema,
ioConfig, null), null);
+ }
+
+ /**
+ * Provide JSON to be sent via HTTP request. Please see payload example in
+ * https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html
+ */
+ String provideInputJSONString(final ParallelIndexSupervisorTask indexTask)
+ throws JsonProcessingException {
+ String taskJSON = mapper.writeValueAsString(indexTask);
+ final ObjectNode jsonObject = (ObjectNode) mapper.readTree(taskJSON);
+ jsonObject.remove("id");
+ jsonObject.remove("groupId");
+ jsonObject.remove("resource");
+
+ final ObjectNode spec = (ObjectNode) jsonObject.get("spec");
+ spec.remove("tuningConfig");
+ jsonObject.put("spec", spec);
+ taskJSON = jsonObject.toString();
+ return taskJSON;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java
b/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java
new file mode 100644
index 0000000000..b5b40bb3ec
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid;
+
+import org.apache.seatunnel.connectors.druid.sink.DruidSinkFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class DruidFactoryTest {
+ @Test
+ public void optionRuleTest() {
+ Assertions.assertNotNull((new DruidSinkFactory()).optionRule());
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index d1e5af9ee6..0498ff4539 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -67,6 +67,7 @@
<module>connector-rabbitmq</module>
<module>connector-openmldb</module>
<module>connector-doris</module>
+ <module>connector-druid</module>
<module>connector-maxcompute</module>
<module>connector-tdengine</module>
<module>connector-selectdb-cloud</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 59ce612230..fe75ace87a 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -227,6 +227,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-druid</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml
new file mode 100644
index 0000000000..2531a70f48
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-druid-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Druid</name>
+
+ <properties>
+ <druid.version>24.0.1</druid.version>
+ <httpclient.version>4.5.13</httpclient.version>
+ </properties>
+
+ <dependencies>
+ <!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-druid</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
new file mode 100644
index 0000000000..1639636b85
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.druid;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.SPARK_2_4},
+ disabledReason = "The RoaringBitmap version is not compatible in
docker container")
+public class DruidIT extends TestSuiteBase implements TestResource {
+
+ private static final String datasource = "testDataSource";
+ private static final String sqlQuery = "SELECT * FROM " + datasource;
+ private static final String DRUID_SERVICE_NAME = "router";
+ private static final int DRUID_SERVICE_PORT = 8888;
+ private DockerComposeContainer environment;
+ private String coordinatorURL;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ environment =
+ new DockerComposeContainer(new
File("src/test/resources/docker-compose.yml"))
+ .withExposedService(
+ DRUID_SERVICE_NAME,
+ DRUID_SERVICE_PORT,
+ Wait.forListeningPort()
+
.withStartupTimeout(Duration.ofSeconds(360)));
+ environment.start();
+ changeCoordinatorURLConf();
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ environment.close();
+ }
+
+ @TestTemplate
+ public void testDruidSink(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fakesource_to_druid.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ while (true) {
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ HttpPost request = new HttpPost("http://" + coordinatorURL +
"/druid/v2/sql");
+ String jsonRequest = "{\"query\": \"" + sqlQuery + "\"}";
+ StringEntity entity = new StringEntity(jsonRequest);
+ entity.setContentType("application/json");
+ request.setEntity(entity);
+ HttpResponse response = client.execute(request);
+ String responseBody =
EntityUtils.toString(response.getEntity());
+ String expectedDataRow1 =
+
"\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
+ String expectedDataRow2 =
+
"\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
+ String expectedDataRow3 =
+
"\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
+ String expectedDataRow4 =
+
"\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
+
+ if (!responseBody.contains("errorMessage")) {
+ // Check sink data
+
Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
+
Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
+
Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
+
Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+ private void changeCoordinatorURLConf() throws UnknownHostException {
+ coordinatorURL = InetAddress.getLocalHost().getHostAddress() + ":8888";
+ String resourceFilePath =
"src/test/resources/fakesource_to_druid.conf";
+ Path path = Paths.get(resourceFilePath);
+ try {
+ List<String> lines = Files.readAllLines(path);
+ List<String> newLines =
+ lines.stream()
+ .map(
+ line -> {
+ if (line.contains("coordinatorUrl")) {
+ return " coordinatorUrl = "
+ + "\""
+ + coordinatorURL
+ + "\"";
+ }
+ return line;
+ })
+ .collect(Collectors.toList());
+ Files.write(path, newLines);
+ log.info("Conf has been updated successfully.");
+ } catch (IOException e) {
+ throw new RuntimeException("Change conf error", e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml
new file mode 100644
index 0000000000..a59b23014e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version: "2.2"
+
+volumes:
+ metadata_data: {}
+ middle_var: {}
+ historical_var: {}
+ broker_var: {}
+ coordinator_var: {}
+ router_var: {}
+ druid_shared: {}
+
+
+services:
+ chmod-service:
+ image: ubuntu:latest
+ user: "0"
+ command: sh -c "mkdir -p /opt/druid/shared && chmod -R a+rwx
/opt/druid/shared"
+ volumes:
+ - druid_shared:/opt/druid/shared
+
+ postgres:
+ image: postgres:latest
+ ports:
+ - "5432:5432"
+ volumes:
+ - metadata_data:/var/lib/postgresql/data
+ environment:
+ - POSTGRES_PASSWORD=FoolishPassword
+ - POSTGRES_USER=druid
+ - POSTGRES_DB=druid
+ depends_on:
+ - chmod-service
+
+ # Need 3.5 or later for container nodes
+ zookeeper:
+ image: zookeeper:3.5.10
+ ports:
+ - "2181:2181"
+ environment:
+ - ZOO_MY_ID=1
+ depends_on:
+ - chmod-service
+
+ coordinator:
+ image: apache/druid:24.0.1
+ volumes:
+ - druid_shared:/opt/druid/shared
+ - coordinator_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - postgres
+ - chmod-service
+ ports:
+ - "8032:8081"
+ command:
+ - coordinator
+ env_file:
+ - environment
+
+ broker:
+ image: apache/druid:24.0.1
+ volumes:
+ - broker_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - postgres
+ - coordinator
+ - chmod-service
+ ports:
+ - "8082:8082"
+ command:
+ - broker
+ env_file:
+ - environment
+
+ historical:
+ image: apache/druid:24.0.1
+ volumes:
+ - druid_shared:/opt/druid/shared
+ - historical_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - postgres
+ - coordinator
+ - chmod-service
+ ports:
+ - "8083:8083"
+ command:
+ - historical
+ env_file:
+ - environment
+
+ middlemanager:
+ image: apache/druid:24.0.1
+ volumes:
+ - druid_shared:/opt/druid/shared
+ - middle_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - postgres
+ - coordinator
+ - chmod-service
+ ports:
+ - "8091:8091"
+ - "8100-8105:8100-8105"
+ command:
+ - middleManager
+ env_file:
+ - environment
+
+ router:
+ image: apache/druid:24.0.1
+ volumes:
+ - router_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - postgres
+ - coordinator
+ - chmod-service
+ ports:
+ - "8888:8888"
+ command:
+ - router
+ env_file:
+ - environment
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment
new file mode 100644
index 0000000000..55737fcbcd
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Java tuning
+#DRUID_XMX=1g
+#DRUID_XMS=1g
+#DRUID_MAXNEWSIZE=250m
+#DRUID_NEWSIZE=250m
+#DRUID_MAXDIRECTMEMORYSIZE=6172m
+DRUID_SINGLE_NODE_CONF=nano-quickstart
+
+druid_emitter_logging_logLevel=debug
+
+druid_extensions_loadList=["druid-histogram", "druid-datasketches",
"druid-lookups-cached-global", "postgresql-metadata-storage",
"druid-multi-stage-query"]
+
+druid_zk_service_host=zookeeper
+
+druid_metadata_storage_host=
+druid_metadata_storage_type=postgresql
+druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
+druid_metadata_storage_connector_user=druid
+druid_metadata_storage_connector_password=FoolishPassword
+
+druid_coordinator_balancer_strategy=cachingCost
+
+druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g",
"-XX:MaxDirectMemorySize=2g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8",
"-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
+
+druid_storage_type=local
+druid_storage_storageDirectory=/opt/druid/shared/segments
+druid_indexer_logs_type=file
+druid_indexer_logs_directory=/opt/druid/shared/indexing-logs
+
+druid_processing_numThreads=1
+druid_processing_numMergeBuffers=1
+druid_worker_capacity=1
+
+DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration
status="WARN"><Appenders><Console name="Console"
target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c -
%m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef
ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog"
additivity="false" level="DEBUG"><AppenderRef
ref="Console"/></Logger></Loggers></Configuration>
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf
new file mode 100644
index 0000000000..7944c7c990
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_boolean = boolean
+ c_timestamp = timestamp
+ c_string = string
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(16, 1)"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [true, "2020-02-02T02:02:02", "NEW", 1, 2, 3, 4, 4.3, 5.3,
6.3]
+ },
+ {
+ kind = INSERT
+ fields = [false, "2012-12-21T12:34:56", "AAA", 1, 1, 333, 323232,
3.1, 9.33333, 99999.99999999]
+ },
+ {
+ kind = INSERT
+ fields = [true, "2016-03-12T11:29:33", "BBB", 1, 2, 672, 546782, 7.9,
6.88888, 88888.45623489]
+ },
+ {
+ kind = INSERT
+ fields = [false, "2014-04-28T09:13:27", "CCC", 1, 1, 271, 683221,
4.8, 4.45271, 79277.68219012]
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Druid {
+ coordinatorUrl = "localhost:8888"
+ datasource = "testDataSource"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 35a002fc4b..9f452425af 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -61,6 +61,7 @@
<module>connector-hbase-e2e</module>
<module>connector-web3j-e2e</module>
<module>connector-maxcompute-e2e</module>
+ <module>connector-druid-e2e</module>
<module>connector-google-firestore-e2e</module>
<module>connector-rocketmq-e2e</module>
<module>connector-file-obs-e2e</module>