This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new bfabb14f feat(loader): support kafka as datasource (#506)
bfabb14f is described below
commit bfabb14fffceba86d07a331ad2b92890b64f7813
Author: Liu Xiao <[email protected]>
AuthorDate: Sat Oct 7 16:22:23 2023 +0800
feat(loader): support kafka as datasource (#506)
https://github.com/apache/incubator-hugegraph-doc/pull/290
---
.github/workflows/loader-ci.yml | 1 +
hugegraph-loader/pom.xml | 23 ++
.../apache/hugegraph/loader/HugeGraphLoader.java | 8 +-
.../hugegraph/loader/constant/Constants.java | 9 +
.../hugegraph/loader/reader/InputReader.java | 11 +
.../hugegraph/loader/reader/kafka/KafkaReader.java | 172 +++++++++++
.../loader/serializer/InputSourceDeser.java | 3 +
.../apache/hugegraph/loader/source/SourceType.java | 4 +-
.../hugegraph/loader/source/kafka/KafkaSource.java | 85 ++++++
.../apache/hugegraph/loader/util/DataTypeUtil.java | 28 ++
.../loader/test/functional/KafkaLoadTest.java | 329 +++++++++++++++++++++
.../loader/test/functional/KafkaUtil.java | 86 ++++++
.../kafka_customized_schema/schema.groovy | 33 +++
.../resources/kafka_customized_schema/struct.json | 89 ++++++
.../resources/kafka_format_csv/schema.groovy} | 15 +-
.../test/resources/kafka_format_csv/struct.json | 26 ++
.../kafka_format_not_support/schema.groovy} | 18 +-
.../resources/kafka_format_not_support/struct.json | 45 +++
.../resources/kafka_format_text/schema.groovy} | 15 +-
.../test/resources/kafka_format_text/struct.json | 27 ++
.../kafka_number_to_string/schema.groovy} | 18 +-
.../resources/kafka_number_to_string/struct.json | 45 +++
.../resources/kafka_value_mapping/schema.groovy} | 15 +-
.../test/resources/kafka_value_mapping/struct.json | 31 ++
24 files changed, 1083 insertions(+), 53 deletions(-)
diff --git a/.github/workflows/loader-ci.yml b/.github/workflows/loader-ci.yml
index ad4f5289..0a7db522 100644
--- a/.github/workflows/loader-ci.yml
+++ b/.github/workflows/loader-ci.yml
@@ -64,6 +64,7 @@ jobs:
mvn test -P file
mvn test -P hdfs
mvn test -P jdbc
+ mvn test -P kafka
- name: Upload coverage to Codecov
uses: codecov/[email protected]
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index c62b8b28..e6d121be 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -51,6 +51,7 @@
<mysql.connector.version>8.0.28</mysql.connector.version>
<postgres.version>42.4.1</postgres.version>
<mssql.jdbc.version>7.2.0.jre8</mssql.jdbc.version>
+ <kafka.testcontainer.version>1.19.0</kafka.testcontainer.version>
</properties>
<dependencies>
@@ -515,6 +516,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${kafka.testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
@@ -615,6 +627,17 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>kafka</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+ <source_type>kafka</source_type>
+ <store_path>/files</store_path>
+ <test-classes>**/KafkaLoadTest.java</test-classes>
+ </properties>
+ </profile>
</profiles>
<build>
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
index 95aca981..a46ff592 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
@@ -239,8 +240,11 @@ public final class HugeGraphLoader {
try {
// Read next line from data source
if (reader.hasNext()) {
- lines.add(reader.next());
- metrics.increaseReadSuccess();
+ Line next = reader.next();
+ if (Objects.nonNull(next)) {
+ lines.add(next);
+ metrics.increaseReadSuccess();
+ }
} else {
finished = true;
}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
index 563cd401..51f51491 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
@@ -81,4 +81,13 @@ public final class Constants {
public static final String LOAD_DATA_PARSE_SUFFIX = "parse";
public static final String LOAD_DATA_SER_SUFFIX = "ser";
public static final String LOAD_DATA_INSERT_SUFFIX = "insert";
+
+ public static final long KAFKA_SESSION_TIMEOUT = 30000;
+ public static final long KAFKA_AUTO_COMMIT_INTERVAL = 1000;
+ public static final String KAFKA_AUTO_COMMIT = "true";
+ public static final String KAFKA_EARLIEST_OFFSET = "earliest";
+ public static final String KAFKA_LATEST_OFFSET = "latest";
+ public static final long KAFKA_POLL_DURATION = 1000;
+ public static final long KAFKA_POLL_GAP_INTERVAL = 1000;
+
}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
index 8e6599f9..566bac12 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
@@ -17,6 +17,9 @@
package org.apache.hugegraph.loader.reader;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
import org.apache.hugegraph.loader.constant.AutoCloseableIterator;
import org.apache.hugegraph.loader.exception.InitException;
import org.apache.hugegraph.loader.executor.LoadContext;
@@ -24,11 +27,13 @@ import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.reader.file.LocalFileReader;
import org.apache.hugegraph.loader.reader.hdfs.HDFSFileReader;
import org.apache.hugegraph.loader.reader.jdbc.JDBCReader;
+import org.apache.hugegraph.loader.reader.kafka.KafkaReader;
import org.apache.hugegraph.loader.reader.line.Line;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.apache.hugegraph.loader.source.hdfs.HDFSSource;
import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
/**
* Responsible for continuously reading the next batch of data lines
@@ -51,9 +56,15 @@ public interface InputReader extends
AutoCloseableIterator<Line> {
return new HDFSFileReader((HDFSSource) source);
case JDBC:
return new JDBCReader((JDBCSource) source);
+ case KAFKA:
+ return new KafkaReader((KafkaSource) source);
default:
throw new AssertionError(String.format("Unsupported input
source '%s'",
source.type()));
}
}
+
+ default List<InputReader> split() {
+ throw new NotImplementedException("Not support multiple readers");
+ }
}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/kafka/KafkaReader.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/kafka/KafkaReader.java
new file mode 100644
index 00000000..40423da5
--- /dev/null
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/kafka/KafkaReader.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.loader.reader.kafka;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Properties;
+import java.util.Queue;
+
+import org.apache.hugegraph.loader.constant.Constants;
+import org.apache.hugegraph.loader.exception.InitException;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.parser.CsvLineParser;
+import org.apache.hugegraph.loader.parser.JsonLineParser;
+import org.apache.hugegraph.loader.parser.LineParser;
+import org.apache.hugegraph.loader.parser.TextLineParser;
+import org.apache.hugegraph.loader.reader.AbstractReader;
+import org.apache.hugegraph.loader.reader.line.Line;
+import org.apache.hugegraph.loader.source.file.FileFormat;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
+import org.apache.hugegraph.util.Log;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableList;
+
+import lombok.SneakyThrows;
+
+public class KafkaReader extends AbstractReader {
+
+ private static final Logger LOG = Log.logger(KafkaReader.class);
+
+ private final KafkaSource source;
+
+ private final LineParser parser;
+ private Queue<String> batch;
+
+ private static final String BASE_CONSUMER_GROUP = "kafka-reader-base";
+ private final KafkaConsumer dataConsumer;
+ private final boolean earlyStop;
+ private boolean emptyPoll;
+
+ public KafkaReader(KafkaSource source) {
+ this.source = source;
+
+ this.dataConsumer = createKafkaConsumer();
+ this.parser = createLineParser();
+ this.earlyStop = source.isEarlyStop();
+ }
+
+ @Override
+ public void init(LoadContext context,
+ InputStruct struct) throws InitException {
+ this.progress(context, struct);
+ }
+
+ @Override
+ public void confirmOffset() {
+ // Do Nothing
+ }
+
+ @Override
+ public void close() {
+ this.dataConsumer.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !this.earlyStop || !this.emptyPoll;
+ }
+
+ @Override
+ public Line next() {
+ if (batch == null || batch.size() == 0) {
+ batch = nextBatch();
+ }
+
+ String rawValue = batch.poll();
+ if (rawValue != null) {
+ return this.parser.parse(this.source.header(), rawValue);
+ } else {
+ this.emptyPoll = true;
+ }
+
+ return null;
+ }
+
+ private int getKafkaTopicPartitionCount() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", this.source.getBootstrapServer());
+ props.put("group.id", BASE_CONSUMER_GROUP);
+
+ KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props);
+ int count = consumer.partitionsFor(this.source.getTopic()).size();
+ consumer.close();
+
+ return count;
+ }
+
+ private KafkaConsumer<String, String> createKafkaConsumer() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", this.source.getBootstrapServer());
+ props.put("max.poll.records", this.source.getBatchSize());
+ props.put("group.id", this.source.getGroup());
+ props.put("enable.auto.commit", Constants.KAFKA_AUTO_COMMIT);
+ props.put("auto.commit.interval.ms",
String.valueOf(Constants.KAFKA_AUTO_COMMIT_INTERVAL));
+ props.put("session.timeout.ms",
String.valueOf(Constants.KAFKA_SESSION_TIMEOUT));
+ if (this.source.isFromBeginning()) {
+ props.put("auto.offset.reset", Constants.KAFKA_EARLIEST_OFFSET);
+ } else {
+ props.put("auto.offset.reset", Constants.KAFKA_LATEST_OFFSET);
+ }
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(ImmutableList.of(this.source.getTopic()));
+ return consumer;
+ }
+
+ @SneakyThrows
+ private Deque<String> nextBatch() {
+ ConsumerRecords<String, String> records =
+
dataConsumer.poll(Duration.ofMillis(Constants.KAFKA_POLL_DURATION));
+ Deque<String> queue = new ArrayDeque<>(records.count());
+ if (records.count() == 0) {
+ Thread.sleep(Constants.KAFKA_POLL_GAP_INTERVAL);
+ } else {
+ for (ConsumerRecord<String, String> record : records) {
+ queue.add(record.value());
+ }
+ }
+
+ return queue;
+ }
+
+ private LineParser createLineParser() {
+ FileFormat format = source.getFormat();
+ switch (format) {
+ case CSV:
+ return new CsvLineParser();
+ case TEXT:
+ return new TextLineParser(source.getDelimiter());
+ case JSON:
+ return new JsonLineParser();
+ default:
+ throw new AssertionError(String.format(
+ "Unsupported file format '%s' of source '%s'",
+ format, source));
+ }
+ }
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
index 37dcef81..d582adb0 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.loader.serializer;
import java.io.IOException;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
import org.apache.hugegraph.loader.util.JsonUtil;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.SourceType;
@@ -66,6 +67,8 @@ public class InputSourceDeser extends
JsonDeserializer<InputSource> {
.toUpperCase());
objectNode.replace(FIELD_VENDOR, vendorNode);
return JsonUtil.convert(node, JDBCSource.class);
+ case KAFKA:
+ return JsonUtil.convert(node, KafkaSource.class);
default:
throw new AssertionError(String.format("Unsupported input
source '%s'", type));
}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
index ed54933f..008b50cd 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
@@ -23,5 +23,7 @@ public enum SourceType {
HDFS,
- JDBC
+ JDBC,
+
+ KAFKA
}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/kafka/KafkaSource.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/kafka/KafkaSource.java
new file mode 100644
index 00000000..ec472ca7
--- /dev/null
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/kafka/KafkaSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.loader.source.kafka;
+
+import java.util.List;
+
+import org.apache.hugegraph.loader.source.AbstractSource;
+import org.apache.hugegraph.loader.source.SourceType;
+import org.apache.hugegraph.loader.source.file.FileFormat;
+import org.apache.hugegraph.loader.source.file.FileSource;
+import org.apache.hugegraph.loader.source.file.SkippedLine;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class KafkaSource extends AbstractSource {
+
+ @JsonProperty("bootstrap_server")
+ private String bootstrapServer;
+
+ @JsonProperty("topic")
+ private String topic;
+
+ @JsonProperty("group")
+ private String group;
+
+ @JsonProperty("from_beginning")
+ private boolean fromBeginning = false;
+
+ private FileFormat format;
+
+ @JsonProperty("delimiter")
+ private String delimiter;
+
+ @JsonProperty("date_format")
+ private String dateFormat;
+
+ @JsonProperty("extra_date_formats")
+ private List<String> extraDateFormats;
+
+ @JsonProperty("time_zone")
+ private String timeZone;
+
+ @JsonProperty("skipped_line")
+ private SkippedLine skippedLine;
+
+ @JsonProperty("batch_size")
+ private int batchSize = 500;
+
+ @JsonProperty("early_stop")
+ private boolean earlyStop = false;
+
+ @Override
+ public SourceType type() {
+ return SourceType.KAFKA;
+ }
+
+ @Override
+ public FileSource asFileSource() {
+ FileSource source = new FileSource();
+ source.header(this.header());
+ source.charset(this.charset());
+ source.listFormat(this.listFormat());
+ return source;
+ }
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
index 3052058d..80a441bc 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
@@ -20,6 +20,7 @@ package org.apache.hugegraph.loader.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -29,6 +30,7 @@ import org.apache.hugegraph.loader.source.AbstractSource;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.apache.hugegraph.loader.source.file.ListFormat;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
import org.apache.hugegraph.structure.constant.Cardinality;
import org.apache.hugegraph.structure.constant.DataType;
import org.apache.hugegraph.structure.schema.PropertyKey;
@@ -140,6 +142,31 @@ public final class DataTypeUtil {
"to Date, but got '%s'", source.getClass().getName());
String dateFormat = ((FileSource) source).dateFormat();
String timeZone = ((FileSource) source).timeZone();
+
+ if (source instanceof KafkaSource) {
+ List<String> extraDateFormats =
+ ((KafkaSource) source).getExtraDateFormats();
+ dateFormat = ((KafkaSource) source).getDateFormat();
+ timeZone = ((KafkaSource) source).getTimeZone();
+ if (extraDateFormats == null || extraDateFormats.isEmpty()) {
+ return parseDate(key, value, dateFormat, timeZone);
+ } else {
+ HashSet<String> allDateFormats = new HashSet<>();
+ allDateFormats.add(dateFormat);
+ allDateFormats.addAll(extraDateFormats);
+ int size = allDateFormats.size();
+ for (String df : allDateFormats) {
+ try {
+ return parseDate(key, value, df, timeZone);
+ } catch (Exception e) {
+ if (--size <= 0) {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
return parseDate(key, value, dateFormat, timeZone);
} else if (dataType.isUUID()) {
return parseUUID(key, value);
@@ -152,6 +179,7 @@ public final class DataTypeUtil {
"The value(key='%s') '%s'(%s) is not match with " +
"data type %s and can't convert to it",
key, value, value.getClass(), dataType);
+
return value;
}
diff --git
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java
new file mode 100644
index 00000000..25193e55
--- /dev/null
+++
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.test.functional;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hugegraph.loader.HugeGraphLoader;
+import org.apache.hugegraph.rest.SerializeException;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class KafkaLoadTest extends LoadTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaLoadTest.class);
+
+ @BeforeClass
+ public static void setUp() throws JsonProcessingException {
+ clearServerData();
+ KafkaUtil.prepareEnv();
+ mockVertexPersonData();
+ mockVertexSoftwareData();
+ mockEdgeKnowsData();
+ mockEdgeCreatedData();
+ mockVertexPersonValueMapping();
+ mockVertexPersonFormatText();
+ mockVertexPersonFormatCsv();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ KafkaUtil.close();
+ }
+
+ @Before
+ public void init() {
+ }
+
+ @After
+ public void clear() {
+ clearServerData();
+ }
+
+ @Test
+ public void testCustomizedSchema() {
+ String[] args = new String[]{
+ "-f", configPath("kafka_customized_schema/struct.json"),
+ "-s", configPath("kafka_customized_schema/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "-p", String.valueOf(PORT),
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+
+ HugeGraphLoader.main(args);
+
+ List<Vertex> vertices = CLIENT.graph().listVertices();
+ List<Edge> edges = CLIENT.graph().listEdges();
+
+ Assert.assertEquals(7, vertices.size());
+ Assert.assertEquals(6, edges.size());
+
+ for (Vertex vertex : vertices) {
+ Assert.assertEquals(Integer.class, vertex.id().getClass());
+ }
+ for (Edge edge : edges) {
+ Assert.assertEquals(Integer.class, edge.sourceId().getClass());
+ Assert.assertEquals(Integer.class, edge.targetId().getClass());
+ }
+ }
+
+ @Test
+ public void testNumberToStringInKafkaSource() {
+ String[] args = new String[]{
+ "-f", configPath("kafka_number_to_string/struct.json"),
+ "-s", configPath("kafka_number_to_string/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "-p", String.valueOf(PORT),
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+
+ HugeGraphLoader.main(args);
+ List<Vertex> vertices = CLIENT.graph().listVertices();
+
+ Assert.assertEquals(7, vertices.size());
+ assertContains(vertices, "person",
+ "name", "marko", "age", "29", "city", "Beijing");
+ assertContains(vertices, "software",
+ "name", "ripple", "lang", "java", "price", "199.67");
+ }
+
+ @Test
+ public void testValueMappingInKafkaSource() {
+ String[] args = new String[]{
+ "-f", configPath("kafka_value_mapping/struct.json"),
+ "-s", configPath("kafka_value_mapping/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "-p", String.valueOf(PORT),
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+
+ HugeGraphLoader.main(args);
+
+ List<Vertex> vertices = CLIENT.graph().listVertices();
+ Assert.assertEquals(2, vertices.size());
+ assertContains(vertices, "person", "name", "marko", "age", 29, "city",
"Beijing");
+ assertContains(vertices, "person", "name", "vadas", "age", 27, "city",
"Shanghai");
+ }
+
+ @Test
+ public void testKafkaFormatNotSupport() {
+ String[] args = new String[]{
+ "-f", configPath("kafka_format_not_support/struct.json"),
+ "-s", configPath("kafka_format_not_support/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "-p", String.valueOf(PORT),
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+
+ Assert.assertThrows(SerializeException.class, () -> {
+ HugeGraphLoader.main(args);
+ });
+ }
+
+ @Test
+ public void testKafkaTextFormat() {
+ String[] args = new String[]{
+ "-f", configPath("kafka_format_text/struct.json"),
+ "-s", configPath("kafka_format_text/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "-p", String.valueOf(PORT),
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+
+ HugeGraphLoader.main(args);
+
+ List<Vertex> vertices = CLIENT.graph().listVertices();
+ Assert.assertEquals(2, vertices.size());
+
+ assertContains(vertices, "person", "name", "marko", "age", 29, "city",
"Beijing");
+ assertContains(vertices, "person", "name", "vadas", "age", 27, "city",
"Shanghai");
+ }
+
+ @Test
+ public void testKafkaCsvFormat() {
+ String[] args = new String[]{
+ "-f", configPath("kafka_format_csv/struct.json"),
+ "-s", configPath("kafka_format_csv/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "-p", String.valueOf(PORT),
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+
+ HugeGraphLoader.main(args);
+
+ List<Vertex> vertices = CLIENT.graph().listVertices();
+ Assert.assertEquals(2, vertices.size());
+
+ assertContains(vertices, "person", "name", "marko", "age", 29, "city",
"Beijing");
+ assertContains(vertices, "person", "name", "vadas", "age", 27, "city",
"Shanghai");
+ }
+
+ private static void mockVertexPersonFormatCsv() {
+ String topicName = "vertex-format-csv";
+ Object[] objects = {
+ "1,marko,29,Beijing",
+ "2,vadas,27,Shanghai"
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockTextData(objects, topicName);
+ }
+
+ private static void mockVertexPersonFormatText() {
+ String topicName = "vertex-format-text";
+ Object[] objects = {
+ "1\tmarko\t29\tBeijing",
+ "2\tvadas\t27\tShanghai"
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockTextData(objects, topicName);
+ }
+
+ private static void mockVertexPersonValueMapping() throws
JsonProcessingException {
+ String topicName = "vertex-person-value-mapping";
+ String[] keys = {"id", "name", "age", "city"};
+ Object[][] objects = {
+ {1, "marko", 29, "1"},
+ {2, "vadas", 27, "2"}
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockData(keys, objects, topicName);
+ }
+
+ private static void mockVertexPersonData() throws JsonProcessingException {
+ String topicName = "vertex-person";
+ String[] keys = {"id", "name", "age", "city"};
+ Object[][] objects = {
+ {1, "marko", 29, "Beijing"},
+ {2, "vadas", 27, "HongKong"},
+ {3, "josh", 32, "Beijing"},
+ {4, "peter", 35, "Shanghai"},
+ {5, "peter", 26, "Wu,han"}
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockData(keys, objects, topicName);
+ }
+
+ private static void mockVertexSoftwareData() throws
JsonProcessingException {
+ String topicName = "vertex-software";
+ String[] keys = {"id", "name", "lang", "price"};
+ Object[][] objects = {
+ {100, "lop", "java", 328.00},
+ {200, "ripple", "java", 199.67}
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockData(keys, objects, topicName);
+ }
+
+ private static void mockEdgeKnowsData() throws JsonProcessingException {
+ String topicName = "edge-knows";
+ String[] keys = {"id", "source_id", "target_id", "date", "weight"};
+ Object[][] objects = {
+ {1, 1, 2, "2016-01-10", 0.50},
+ {2, 1, 3, "2013-02-20", 1.00}
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockData(keys, objects, topicName);
+ }
+
+ private static void mockEdgeCreatedData() throws JsonProcessingException {
+ String topicName = "edge-created";
+ String[] keys = {"id", "source_id", "target_id", "date", "weight"};
+ Object[][] objects = {
+ {1, 1, 100, "2017-12-10", 0.40},
+ {2, 3, 100, "2009-11-11", 0.40},
+ {3, 3, 200, "2017-12-10", 1.00},
+ {4, 4, 100, "2017-03-24", 0.20}
+ };
+ KafkaUtil.createTopic(topicName);
+ commonMockData(keys, objects, topicName);
+ }
+
+ @NotNull
+ private static Producer<String, String> createKafkaProducer() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", KafkaUtil.getBootStrapServers());
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ Producer<String, String> producer = new KafkaProducer<>(props);
+ return producer;
+ }
+
+ private static void commonMockData(String[] keys, Object[][] objects,
String topic)
+ throws JsonProcessingException {
+
+ Producer<String, String> producer = createKafkaProducer();
+
+ for (Object[] object : objects) {
+ Map<String, Object> map = new HashMap<>();
+ for (int i = 0; i < keys.length; i++) {
+ map.put(keys[i], object[i]);
+ }
+ ObjectMapper objectMapper = new ObjectMapper();
+ String value = objectMapper.writeValueAsString(map);
+ ProducerRecord<String, String> record = new
ProducerRecord<>(topic, value);
+ producer.send(record);
+ }
+
+ producer.flush();
+ producer.close();
+ }
+
+ private static void commonMockTextData(Object[] objects, String topicName)
{
+ Producer<String, String> producer = createKafkaProducer();
+
+ for (Object object : objects) {
+ ProducerRecord<String, String> record =
+ new ProducerRecord<>(topicName, object.toString());
+ producer.send(record);
+ }
+
+ producer.flush();
+ producer.close();
+ }
+}
diff --git
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaUtil.java
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaUtil.java
new file mode 100644
index 00000000..d8f18626
--- /dev/null
+++
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.test.functional;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+public class KafkaUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);
+
+ private static final DockerImageName KAFKA_DOCKER_IMAGE =
+ DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
+
+ private static KafkaContainer kafkaContainer;
+
+ private static String bootStrapServers;
+
+ private static AdminClient adminClient;
+
+ public static void prepareEnv() {
+ startService();
+ createAdminClient();
+ }
+
+ public static void close() {
+ adminClient.close();
+ kafkaContainer.close();
+ }
+
+ private static void startService() {
+ kafkaContainer = new KafkaContainer(KAFKA_DOCKER_IMAGE);
+ kafkaContainer.setPortBindings(Arrays.asList("9092:9092",
"9093:9093"));
+ kafkaContainer.start();
+ kafkaContainer.waitingFor(Wait.defaultWaitStrategy());
+ bootStrapServers = kafkaContainer.getBootstrapServers();
+ LOG.info("Kafka Container run successfully, bootstrapServer: {}.",
bootStrapServers);
+ }
+
+ private static void createAdminClient() {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootStrapServers);
+ adminClient = AdminClient.create(properties);
+ }
+
+ public static void createTopic(String topicName) {
+ int numPartitions = 1;
+ int replicationFactor = 1;
+ NewTopic newTopic =
+ new NewTopic(topicName, numPartitions, (short)
replicationFactor);
+ adminClient.createTopics(Collections.singleton(newTopic));
+ }
+
+ public static String getBootStrapServers() {
+ return bootStrapServers;
+ }
+
+ public static AdminClient getAdminClient() {
+ return adminClient;
+ }
+}
diff --git
a/hugegraph-loader/src/test/resources/kafka_customized_schema/schema.groovy
b/hugegraph-loader/src/test/resources/kafka_customized_schema/schema.groovy
new file mode 100644
index 00000000..ed703574
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_customized_schema/schema.groovy
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+schema.propertyKey("feel").asText().valueList().ifNotExist().create();
+schema.propertyKey("time").asText().valueSet().ifNotExist().create();
+
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age",
"city").nullableKeys("age", "city").ifNotExist().create();
+schema.vertexLabel("software").useCustomizeNumberId().properties("name",
"lang", "price").ifNotExist().create();
+
+schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date",
"weight").ifNotExist().create();
+schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date",
"weight").ifNotExist().create();
+schema.edgeLabel("use").sourceLabel("person").targetLabel("software").properties("feel",
"time").nullableKeys("feel", "time").ifNotExist().create();
diff --git
a/hugegraph-loader/src/test/resources/kafka_customized_schema/struct.json
b/hugegraph-loader/src/test/resources/kafka_customized_schema/struct.json
new file mode 100644
index 00000000..6e30520e
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_customized_schema/struct.json
@@ -0,0 +1,89 @@
+{
+ "version": "2.0",
+ "structs": [
+ {
+ "id": "1",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-person",
+ "from_beginning": true,
+ "group": "consumer_customize_schema",
+ "format": "JSON",
+ "early_stop": true
+ },
+ "vertices": [
+ {
+ "label": "person",
+ "id": "id",
+ "null_values": ["NULL"]
+ }
+ ],
+ "edges": []
+ },
+ {
+ "id": "2",
+ "input": {
+ "type": "KAFKA",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-software",
+ "from_beginning": true,
+ "group": "consumer_customize_schema",
+ "format": "JSON",
+ "early_stop": true,
+ "batch_size": 2
+ },
+ "vertices": [
+ {
+ "label": "software",
+ "id": "id"
+ }
+ ],
+ "edges": []
+ },
+ {
+ "id": "3",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "edge-knows",
+ "from_beginning": true,
+ "group": "consumer_customize_schema",
+ "format": "JSON",
+ "early_stop": true,
+ "batch_size": 2
+ },
+ "vertices": [],
+ "edges": [
+ {
+ "label": "knows",
+ "source": ["source_id"],
+ "target": ["target_id"],
+ "ignored": ["id"]
+ }
+ ]
+ },
+ {
+ "id": "4",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "edge-created",
+ "from_beginning": true,
+ "group": "consumer_customize_schema",
+ "format": "JSON",
+ "early_stop": true,
+ "batch_size": 2
+ },
+ "vertices": [],
+ "edges": [
+ {
+ "label": "created",
+ "source": ["source_id"],
+ "target": ["target_id"],
+ "ignored": ["id"]
+ }
+ ]
+ }
+ ]
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
b/hugegraph-loader/src/test/resources/kafka_format_csv/schema.groovy
similarity index 70%
copy from
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_format_csv/schema.groovy
index ed54933f..305adf46 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_format_csv/schema.groovy
@@ -14,14 +14,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
- FILE,
-
- HDFS,
-
- JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age",
"city").nullableKeys("age", "city").ifNotExist().create();
diff --git a/hugegraph-loader/src/test/resources/kafka_format_csv/struct.json
b/hugegraph-loader/src/test/resources/kafka_format_csv/struct.json
new file mode 100644
index 00000000..963a3409
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_format_csv/struct.json
@@ -0,0 +1,26 @@
+{
+ "version": "2.0",
+ "structs": [
+ {
+ "id": "1",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-format-csv",
+ "from_beginning": true,
+ "group": "consumer_format_csv",
+ "format": "CSV",
+ "header": ["id", "name", "age", "city"],
+ "early_stop": true
+ },
+ "vertices": [
+ {
+ "label": "person",
+ "id": "id",
+ "null_values": ["NULL"]
+ }
+ ],
+ "edges": []
+ }
+ ]
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
b/hugegraph-loader/src/test/resources/kafka_format_not_support/schema.groovy
similarity index 56%
copy from
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to
hugegraph-loader/src/test/resources/kafka_format_not_support/schema.groovy
index ed54933f..784bda54 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_format_not_support/schema.groovy
@@ -14,14 +14,12 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asText().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("price").asText().ifNotExist().create();
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
- FILE,
-
- HDFS,
-
- JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age",
"city").nullableKeys("name", "age", "city").ifNotExist().create();
+schema.vertexLabel("software").useCustomizeNumberId().properties("name",
"lang", "price").nullableKeys("name", "lang", "price").ifNotExist().create();
diff --git
a/hugegraph-loader/src/test/resources/kafka_format_not_support/struct.json
b/hugegraph-loader/src/test/resources/kafka_format_not_support/struct.json
new file mode 100644
index 00000000..75526ed8
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_format_not_support/struct.json
@@ -0,0 +1,45 @@
+{
+ "version": "2.0",
+ "structs": [
+ {
+ "id": "1",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-person",
+ "from_beginning": true,
+ "group": "consumer_num_to_str",
+ "format": "EXCEL",
+ "early_stop": true
+ },
+ "vertices": [
+ {
+ "label": "person",
+ "id": "id",
+ "null_values": ["NULL"]
+ }
+ ],
+ "edges": []
+ },
+ {
+ "id": "2",
+ "input": {
+ "type": "KAFKA",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-software",
+ "from_beginning": true,
+ "group": "consumer_num_to_str",
+ "format": "JSON",
+ "early_stop": true,
+ "batch_size": 2
+ },
+ "vertices": [
+ {
+ "label": "software",
+ "id": "id"
+ }
+ ],
+ "edges": []
+ }
+ ]
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
b/hugegraph-loader/src/test/resources/kafka_format_text/schema.groovy
similarity index 70%
copy from
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_format_text/schema.groovy
index ed54933f..305adf46 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_format_text/schema.groovy
@@ -14,14 +14,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
- FILE,
-
- HDFS,
-
- JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age",
"city").nullableKeys("age", "city").ifNotExist().create();
diff --git a/hugegraph-loader/src/test/resources/kafka_format_text/struct.json
b/hugegraph-loader/src/test/resources/kafka_format_text/struct.json
new file mode 100644
index 00000000..95811bb2
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_format_text/struct.json
@@ -0,0 +1,27 @@
+{
+ "version": "2.0",
+ "structs": [
+ {
+ "id": "1",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-format-text",
+ "from_beginning": true,
+ "group": "consumer_format_text",
+ "format": "TEXT",
+ "header": ["id", "name", "age", "city"],
+ "delimiter": "\t",
+ "early_stop": true
+ },
+ "vertices": [
+ {
+ "label": "person",
+ "id": "id",
+ "null_values": ["NULL"]
+ }
+ ],
+ "edges": []
+ }
+ ]
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
b/hugegraph-loader/src/test/resources/kafka_number_to_string/schema.groovy
similarity index 56%
copy from
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_number_to_string/schema.groovy
index ed54933f..784bda54 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_number_to_string/schema.groovy
@@ -14,14 +14,12 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asText().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("price").asText().ifNotExist().create();
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
- FILE,
-
- HDFS,
-
- JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age",
"city").nullableKeys("name", "age", "city").ifNotExist().create();
+schema.vertexLabel("software").useCustomizeNumberId().properties("name",
"lang", "price").nullableKeys("name", "lang", "price").ifNotExist().create();
diff --git
a/hugegraph-loader/src/test/resources/kafka_number_to_string/struct.json
b/hugegraph-loader/src/test/resources/kafka_number_to_string/struct.json
new file mode 100644
index 00000000..9ead6a3b
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_number_to_string/struct.json
@@ -0,0 +1,45 @@
+{
+ "version": "2.0",
+ "structs": [
+ {
+ "id": "1",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-person",
+ "from_beginning": true,
+ "group": "consumer_num_to_str",
+ "format": "JSON",
+ "early_stop": true
+ },
+ "vertices": [
+ {
+ "label": "person",
+ "id": "id",
+ "null_values": ["NULL"]
+ }
+ ],
+ "edges": []
+ },
+ {
+ "id": "2",
+ "input": {
+ "type": "KAFKA",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-software",
+ "from_beginning": true,
+ "group": "consumer_num_to_str",
+ "format": "JSON",
+ "early_stop": true,
+ "batch_size": 2
+ },
+ "vertices": [
+ {
+ "label": "software",
+ "id": "id"
+ }
+ ],
+ "edges": []
+ }
+ ]
+}
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
b/hugegraph-loader/src/test/resources/kafka_value_mapping/schema.groovy
similarity index 70%
copy from
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_value_mapping/schema.groovy
index ed54933f..305adf46 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_value_mapping/schema.groovy
@@ -14,14 +14,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
- FILE,
-
- HDFS,
-
- JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age",
"city").nullableKeys("age", "city").ifNotExist().create();
diff --git
a/hugegraph-loader/src/test/resources/kafka_value_mapping/struct.json
b/hugegraph-loader/src/test/resources/kafka_value_mapping/struct.json
new file mode 100644
index 00000000..332dbe64
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_value_mapping/struct.json
@@ -0,0 +1,31 @@
+{
+ "version": "2.0",
+ "structs": [
+ {
+ "id": "1",
+ "input": {
+ "type": "kafka",
+ "bootstrap_server": "localhost:9093",
+ "topic": "vertex-person-value-mapping",
+ "from_beginning": true,
+ "group": "consumer_value_mapping",
+ "format": "JSON",
+ "early_stop": true
+ },
+ "vertices": [
+ {
+ "label": "person",
+ "id": "id",
+ "null_values": ["NULL"],
+ "value_mapping": {
+ "city": {
+ "1": "Beijing",
+ "2": "Shanghai"
+ }
+ }
+ }
+ ],
+ "edges": []
+ }
+ ]
+}