This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new bfabb14f feat(loader): support kafka as datasource (#506)
bfabb14f is described below

commit bfabb14fffceba86d07a331ad2b92890b64f7813
Author: Liu Xiao <[email protected]>
AuthorDate: Sat Oct 7 16:22:23 2023 +0800

    feat(loader): support kafka as datasource (#506)
    
    https://github.com/apache/incubator-hugegraph-doc/pull/290
---
 .github/workflows/loader-ci.yml                    |   1 +
 hugegraph-loader/pom.xml                           |  23 ++
 .../apache/hugegraph/loader/HugeGraphLoader.java   |   8 +-
 .../hugegraph/loader/constant/Constants.java       |   9 +
 .../hugegraph/loader/reader/InputReader.java       |  11 +
 .../hugegraph/loader/reader/kafka/KafkaReader.java | 172 +++++++++++
 .../loader/serializer/InputSourceDeser.java        |   3 +
 .../apache/hugegraph/loader/source/SourceType.java |   4 +-
 .../hugegraph/loader/source/kafka/KafkaSource.java |  85 ++++++
 .../apache/hugegraph/loader/util/DataTypeUtil.java |  28 ++
 .../loader/test/functional/KafkaLoadTest.java      | 329 +++++++++++++++++++++
 .../loader/test/functional/KafkaUtil.java          |  86 ++++++
 .../kafka_customized_schema/schema.groovy          |  33 +++
 .../resources/kafka_customized_schema/struct.json  |  89 ++++++
 .../resources/kafka_format_csv/schema.groovy}      |  15 +-
 .../test/resources/kafka_format_csv/struct.json    |  26 ++
 .../kafka_format_not_support/schema.groovy}        |  18 +-
 .../resources/kafka_format_not_support/struct.json |  45 +++
 .../resources/kafka_format_text/schema.groovy}     |  15 +-
 .../test/resources/kafka_format_text/struct.json   |  27 ++
 .../kafka_number_to_string/schema.groovy}          |  18 +-
 .../resources/kafka_number_to_string/struct.json   |  45 +++
 .../resources/kafka_value_mapping/schema.groovy}   |  15 +-
 .../test/resources/kafka_value_mapping/struct.json |  31 ++
 24 files changed, 1083 insertions(+), 53 deletions(-)

diff --git a/.github/workflows/loader-ci.yml b/.github/workflows/loader-ci.yml
index ad4f5289..0a7db522 100644
--- a/.github/workflows/loader-ci.yml
+++ b/.github/workflows/loader-ci.yml
@@ -64,6 +64,7 @@ jobs:
           mvn test -P file
           mvn test -P hdfs
           mvn test -P jdbc
+          mvn test -P kafka
 
       - name: Upload coverage to Codecov
         uses: codecov/[email protected]
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index c62b8b28..e6d121be 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -51,6 +51,7 @@
         <mysql.connector.version>8.0.28</mysql.connector.version>
         <postgres.version>42.4.1</postgres.version>
         <mssql.jdbc.version>7.2.0.jre8</mssql.jdbc.version>
+        <kafka.testcontainer.version>1.19.0</kafka.testcontainer.version>
     </properties>
 
     <dependencies>
@@ -515,6 +516,17 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${kafka.testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
@@ -615,6 +627,17 @@
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>kafka</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <properties>
+                <source_type>kafka</source_type>
+                <store_path>/files</store_path>
+                <test-classes>**/KafkaLoadTest.java</test-classes>
+            </properties>
+        </profile>
     </profiles>
 
     <build>
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
index 95aca981..a46ff592 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -239,8 +240,11 @@ public final class HugeGraphLoader {
             try {
                 // Read next line from data source
                 if (reader.hasNext()) {
-                    lines.add(reader.next());
-                    metrics.increaseReadSuccess();
+                    Line next = reader.next();
+                    if (Objects.nonNull(next)) {
+                        lines.add(next);
+                        metrics.increaseReadSuccess();
+                    }
                 } else {
                     finished = true;
                 }
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
index 563cd401..51f51491 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/constant/Constants.java
@@ -81,4 +81,13 @@ public final class Constants {
     public static final String LOAD_DATA_PARSE_SUFFIX = "parse";
     public static final String LOAD_DATA_SER_SUFFIX = "ser";
     public static final String LOAD_DATA_INSERT_SUFFIX = "insert";
+
+    public static final long KAFKA_SESSION_TIMEOUT = 30000;
+    public static final long KAFKA_AUTO_COMMIT_INTERVAL = 1000;
+    public static final String KAFKA_AUTO_COMMIT = "true";
+    public static final String KAFKA_EARLIEST_OFFSET = "earliest";
+    public static final String KAFKA_LATEST_OFFSET = "latest";
+    public static final long KAFKA_POLL_DURATION = 1000;
+    public static final long KAFKA_POLL_GAP_INTERVAL = 1000;
+
 }
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
index 8e6599f9..566bac12 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/InputReader.java
@@ -17,6 +17,9 @@
 
 package org.apache.hugegraph.loader.reader;
 
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.hugegraph.loader.constant.AutoCloseableIterator;
 import org.apache.hugegraph.loader.exception.InitException;
 import org.apache.hugegraph.loader.executor.LoadContext;
@@ -24,11 +27,13 @@ import org.apache.hugegraph.loader.mapping.InputStruct;
 import org.apache.hugegraph.loader.reader.file.LocalFileReader;
 import org.apache.hugegraph.loader.reader.hdfs.HDFSFileReader;
 import org.apache.hugegraph.loader.reader.jdbc.JDBCReader;
+import org.apache.hugegraph.loader.reader.kafka.KafkaReader;
 import org.apache.hugegraph.loader.reader.line.Line;
 import org.apache.hugegraph.loader.source.InputSource;
 import org.apache.hugegraph.loader.source.file.FileSource;
 import org.apache.hugegraph.loader.source.hdfs.HDFSSource;
 import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
 
 /**
  * Responsible for continuously reading the next batch of data lines
@@ -51,9 +56,15 @@ public interface InputReader extends 
AutoCloseableIterator<Line> {
                 return new HDFSFileReader((HDFSSource) source);
             case JDBC:
                 return new JDBCReader((JDBCSource) source);
+            case KAFKA:
+                return new KafkaReader((KafkaSource) source);
             default:
                 throw new AssertionError(String.format("Unsupported input 
source '%s'",
                                                        source.type()));
         }
     }
+
+    default List<InputReader> split() {
+        throw new NotImplementedException("Not support multiple readers");
+    }
 }
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/kafka/KafkaReader.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/kafka/KafkaReader.java
new file mode 100644
index 00000000..40423da5
--- /dev/null
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/kafka/KafkaReader.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.loader.reader.kafka;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Properties;
+import java.util.Queue;
+
+import org.apache.hugegraph.loader.constant.Constants;
+import org.apache.hugegraph.loader.exception.InitException;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.parser.CsvLineParser;
+import org.apache.hugegraph.loader.parser.JsonLineParser;
+import org.apache.hugegraph.loader.parser.LineParser;
+import org.apache.hugegraph.loader.parser.TextLineParser;
+import org.apache.hugegraph.loader.reader.AbstractReader;
+import org.apache.hugegraph.loader.reader.line.Line;
+import org.apache.hugegraph.loader.source.file.FileFormat;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
+import org.apache.hugegraph.util.Log;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableList;
+
+import lombok.SneakyThrows;
+
+public class KafkaReader extends AbstractReader {
+
+    private static final Logger LOG = Log.logger(KafkaReader.class);
+
+    private final KafkaSource source;
+
+    private final LineParser parser;
+    private Queue<String> batch;
+
+    private static final String BASE_CONSUMER_GROUP = "kafka-reader-base";
+    private final KafkaConsumer dataConsumer;
+    private final boolean earlyStop;
+    private boolean emptyPoll;
+
+    public KafkaReader(KafkaSource source) {
+        this.source = source;
+
+        this.dataConsumer = createKafkaConsumer();
+        this.parser = createLineParser();
+        this.earlyStop = source.isEarlyStop();
+    }
+
+    @Override
+    public void init(LoadContext context,
+                     InputStruct struct) throws InitException {
+        this.progress(context, struct);
+    }
+
+    @Override
+    public void confirmOffset() {
+        // Do Nothing
+    }
+
+    @Override
+    public void close() {
+        this.dataConsumer.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !this.earlyStop || !this.emptyPoll;
+    }
+
+    @Override
+    public Line next() {
+        if (batch == null || batch.size() == 0) {
+            batch = nextBatch();
+        }
+
+        String rawValue = batch.poll();
+        if (rawValue != null) {
+            return this.parser.parse(this.source.header(), rawValue);
+        } else {
+            this.emptyPoll = true;
+        }
+
+        return null;
+    }
+
+    private int getKafkaTopicPartitionCount() {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", this.source.getBootstrapServer());
+        props.put("group.id", BASE_CONSUMER_GROUP);
+
+        KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props);
+        int count = consumer.partitionsFor(this.source.getTopic()).size();
+        consumer.close();
+
+        return count;
+    }
+
+    private KafkaConsumer<String, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", this.source.getBootstrapServer());
+        props.put("max.poll.records", this.source.getBatchSize());
+        props.put("group.id", this.source.getGroup());
+        props.put("enable.auto.commit", Constants.KAFKA_AUTO_COMMIT);
+        props.put("auto.commit.interval.ms", 
String.valueOf(Constants.KAFKA_AUTO_COMMIT_INTERVAL));
+        props.put("session.timeout.ms", 
String.valueOf(Constants.KAFKA_SESSION_TIMEOUT));
+        if (this.source.isFromBeginning()) {
+            props.put("auto.offset.reset", Constants.KAFKA_EARLIEST_OFFSET);
+        } else {
+            props.put("auto.offset.reset", Constants.KAFKA_LATEST_OFFSET);
+        }
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(ImmutableList.of(this.source.getTopic()));
+        return consumer;
+    }
+
+    @SneakyThrows
+    private Deque<String> nextBatch() {
+        ConsumerRecords<String, String> records =
+                
dataConsumer.poll(Duration.ofMillis(Constants.KAFKA_POLL_DURATION));
+        Deque<String> queue = new ArrayDeque<>(records.count());
+        if (records.count() == 0) {
+            Thread.sleep(Constants.KAFKA_POLL_GAP_INTERVAL);
+        } else {
+            for (ConsumerRecord<String, String> record : records) {
+                queue.add(record.value());
+            }
+        }
+
+        return queue;
+    }
+
+    private LineParser createLineParser() {
+        FileFormat format = source.getFormat();
+        switch (format) {
+            case CSV:
+                return new CsvLineParser();
+            case TEXT:
+                return new TextLineParser(source.getDelimiter());
+            case JSON:
+                return new JsonLineParser();
+            default:
+                throw new AssertionError(String.format(
+                        "Unsupported file format '%s' of source '%s'",
+                        format, source));
+        }
+    }
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
index 37dcef81..d582adb0 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/serializer/InputSourceDeser.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.loader.serializer;
 
 import java.io.IOException;
 
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
 import org.apache.hugegraph.loader.util.JsonUtil;
 import org.apache.hugegraph.loader.source.InputSource;
 import org.apache.hugegraph.loader.source.SourceType;
@@ -66,6 +67,8 @@ public class InputSourceDeser extends 
JsonDeserializer<InputSource> {
                                                         .toUpperCase());
                 objectNode.replace(FIELD_VENDOR, vendorNode);
                 return JsonUtil.convert(node, JDBCSource.class);
+            case KAFKA:
+                return JsonUtil.convert(node, KafkaSource.class);
             default:
                 throw new AssertionError(String.format("Unsupported input 
source '%s'", type));
         }
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
index ed54933f..008b50cd 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
@@ -23,5 +23,7 @@ public enum SourceType {
 
     HDFS,
 
-    JDBC
+    JDBC,
+
+    KAFKA
 }
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/kafka/KafkaSource.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/kafka/KafkaSource.java
new file mode 100644
index 00000000..ec472ca7
--- /dev/null
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/kafka/KafkaSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.loader.source.kafka;
+
+import java.util.List;
+
+import org.apache.hugegraph.loader.source.AbstractSource;
+import org.apache.hugegraph.loader.source.SourceType;
+import org.apache.hugegraph.loader.source.file.FileFormat;
+import org.apache.hugegraph.loader.source.file.FileSource;
+import org.apache.hugegraph.loader.source.file.SkippedLine;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class KafkaSource extends AbstractSource {
+
+    @JsonProperty("bootstrap_server")
+    private String bootstrapServer;
+
+    @JsonProperty("topic")
+    private String topic;
+
+    @JsonProperty("group")
+    private String group;
+
+    @JsonProperty("from_beginning")
+    private boolean fromBeginning = false;
+
+    private FileFormat format;
+
+    @JsonProperty("delimiter")
+    private String delimiter;
+
+    @JsonProperty("date_format")
+    private String dateFormat;
+
+    @JsonProperty("extra_date_formats")
+    private List<String> extraDateFormats;
+
+    @JsonProperty("time_zone")
+    private String timeZone;
+
+    @JsonProperty("skipped_line")
+    private SkippedLine skippedLine;
+
+    @JsonProperty("batch_size")
+    private int batchSize = 500;
+
+    @JsonProperty("early_stop")
+    private boolean earlyStop = false;
+
+    @Override
+    public SourceType type() {
+        return SourceType.KAFKA;
+    }
+
+    @Override
+    public FileSource asFileSource() {
+        FileSource source = new FileSource();
+        source.header(this.header());
+        source.charset(this.charset());
+        source.listFormat(this.listFormat());
+        return source;
+    }
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
index 3052058d..80a441bc 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/DataTypeUtil.java
@@ -20,6 +20,7 @@ package org.apache.hugegraph.loader.util;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -29,6 +30,7 @@ import org.apache.hugegraph.loader.source.AbstractSource;
 import org.apache.hugegraph.loader.source.InputSource;
 import org.apache.hugegraph.loader.source.file.FileSource;
 import org.apache.hugegraph.loader.source.file.ListFormat;
+import org.apache.hugegraph.loader.source.kafka.KafkaSource;
 import org.apache.hugegraph.structure.constant.Cardinality;
 import org.apache.hugegraph.structure.constant.DataType;
 import org.apache.hugegraph.structure.schema.PropertyKey;
@@ -140,6 +142,31 @@ public final class DataTypeUtil {
                          "to Date, but got '%s'", source.getClass().getName());
             String dateFormat = ((FileSource) source).dateFormat();
             String timeZone = ((FileSource) source).timeZone();
+
+            if (source instanceof KafkaSource) {
+                List<String> extraDateFormats =
+                        ((KafkaSource) source).getExtraDateFormats();
+                dateFormat = ((KafkaSource) source).getDateFormat();
+                timeZone = ((KafkaSource) source).getTimeZone();
+                if (extraDateFormats == null || extraDateFormats.isEmpty()) {
+                    return parseDate(key, value, dateFormat, timeZone);
+                } else {
+                    HashSet<String> allDateFormats = new HashSet<>();
+                    allDateFormats.add(dateFormat);
+                    allDateFormats.addAll(extraDateFormats);
+                    int size = allDateFormats.size();
+                    for (String df : allDateFormats) {
+                        try {
+                            return parseDate(key, value, df, timeZone);
+                        } catch (Exception e) {
+                            if (--size <= 0) {
+                                throw e;
+                            }
+                        }
+                    }
+                }
+            }
+
             return parseDate(key, value, dateFormat, timeZone);
         } else if (dataType.isUUID()) {
             return parseUUID(key, value);
@@ -152,6 +179,7 @@ public final class DataTypeUtil {
                         "The value(key='%s') '%s'(%s) is not match with " +
                         "data type %s and can't convert to it",
                         key, value, value.getClass(), dataType);
+
         return value;
     }
 
diff --git 
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java
 
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java
new file mode 100644
index 00000000..25193e55
--- /dev/null
+++ 
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaLoadTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.test.functional;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hugegraph.loader.HugeGraphLoader;
+import org.apache.hugegraph.rest.SerializeException;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class KafkaLoadTest extends LoadTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaLoadTest.class);
+
+    @BeforeClass
+    public static void setUp() throws JsonProcessingException {
+        clearServerData();
+        KafkaUtil.prepareEnv();
+        mockVertexPersonData();
+        mockVertexSoftwareData();
+        mockEdgeKnowsData();
+        mockEdgeCreatedData();
+        mockVertexPersonValueMapping();
+        mockVertexPersonFormatText();
+        mockVertexPersonFormatCsv();
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        KafkaUtil.close();
+    }
+
+    @Before
+    public void init() {
+    }
+
+    @After
+    public void clear() {
+        clearServerData();
+    }
+
+    @Test
+    public void testCustomizedSchema() {
+        String[] args = new String[]{
+                "-f", configPath("kafka_customized_schema/struct.json"),
+                "-s", configPath("kafka_customized_schema/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "-p", String.valueOf(PORT),
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+
+        HugeGraphLoader.main(args);
+
+        List<Vertex> vertices = CLIENT.graph().listVertices();
+        List<Edge> edges = CLIENT.graph().listEdges();
+
+        Assert.assertEquals(7, vertices.size());
+        Assert.assertEquals(6, edges.size());
+
+        for (Vertex vertex : vertices) {
+            Assert.assertEquals(Integer.class, vertex.id().getClass());
+        }
+        for (Edge edge : edges) {
+            Assert.assertEquals(Integer.class, edge.sourceId().getClass());
+            Assert.assertEquals(Integer.class, edge.targetId().getClass());
+        }
+    }
+
+    @Test
+    public void testNumberToStringInKafkaSource() {
+        String[] args = new String[]{
+                "-f", configPath("kafka_number_to_string/struct.json"),
+                "-s", configPath("kafka_number_to_string/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "-p", String.valueOf(PORT),
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+
+        HugeGraphLoader.main(args);
+        List<Vertex> vertices = CLIENT.graph().listVertices();
+
+        Assert.assertEquals(7, vertices.size());
+        assertContains(vertices, "person",
+                       "name", "marko", "age", "29", "city", "Beijing");
+        assertContains(vertices, "software",
+                       "name", "ripple", "lang", "java", "price", "199.67");
+    }
+
+    @Test
+    public void testValueMappingInKafkaSource() {
+        String[] args = new String[]{
+                "-f", configPath("kafka_value_mapping/struct.json"),
+                "-s", configPath("kafka_value_mapping/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "-p", String.valueOf(PORT),
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+
+        HugeGraphLoader.main(args);
+
+        List<Vertex> vertices = CLIENT.graph().listVertices();
+        Assert.assertEquals(2, vertices.size());
+        assertContains(vertices, "person", "name", "marko", "age", 29, "city", 
"Beijing");
+        assertContains(vertices, "person", "name", "vadas", "age", 27, "city", 
"Shanghai");
+    }
+
+    @Test
+    public void testKafkaFormatNotSupport() {
+        String[] args = new String[]{
+                "-f", configPath("kafka_format_not_support/struct.json"),
+                "-s", configPath("kafka_format_not_support/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "-p", String.valueOf(PORT),
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+
+        Assert.assertThrows(SerializeException.class, () -> {
+            HugeGraphLoader.main(args);
+        });
+    }
+
+    @Test
+    public void testKafkaTextFormat() {
+        String[] args = new String[]{
+                "-f", configPath("kafka_format_text/struct.json"),
+                "-s", configPath("kafka_format_text/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "-p", String.valueOf(PORT),
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+
+        HugeGraphLoader.main(args);
+
+        List<Vertex> vertices = CLIENT.graph().listVertices();
+        Assert.assertEquals(2, vertices.size());
+
+        assertContains(vertices, "person", "name", "marko", "age", 29, "city", 
"Beijing");
+        assertContains(vertices, "person", "name", "vadas", "age", 27, "city", 
"Shanghai");
+    }
+
+    @Test
+    public void testKafkaCsvFormat() {
+        String[] args = new String[]{
+                "-f", configPath("kafka_format_csv/struct.json"),
+                "-s", configPath("kafka_format_csv/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "-p", String.valueOf(PORT),
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+
+        HugeGraphLoader.main(args);
+
+        List<Vertex> vertices = CLIENT.graph().listVertices();
+        Assert.assertEquals(2, vertices.size());
+
+        assertContains(vertices, "person", "name", "marko", "age", 29, "city", 
"Beijing");
+        assertContains(vertices, "person", "name", "vadas", "age", 27, "city", 
"Shanghai");
+    }
+
+    private static void mockVertexPersonFormatCsv() {
+        String topicName = "vertex-format-csv";
+        Object[] objects = {
+                "1,marko,29,Beijing",
+                "2,vadas,27,Shanghai"
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockTextData(objects, topicName);
+    }
+
+    private static void mockVertexPersonFormatText() {
+        String topicName = "vertex-format-text";
+        Object[] objects = {
+                "1\tmarko\t29\tBeijing",
+                "2\tvadas\t27\tShanghai"
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockTextData(objects, topicName);
+    }
+
+    private static void mockVertexPersonValueMapping() throws 
JsonProcessingException {
+        String topicName = "vertex-person-value-mapping";
+        String[] keys = {"id", "name", "age", "city"};
+        Object[][] objects = {
+                {1, "marko", 29, "1"},
+                {2, "vadas", 27, "2"}
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockData(keys, objects, topicName);
+    }
+
+    private static void mockVertexPersonData() throws JsonProcessingException {
+        String topicName = "vertex-person";
+        String[] keys = {"id", "name", "age", "city"};
+        Object[][] objects = {
+                {1, "marko", 29, "Beijing"},
+                {2, "vadas", 27, "HongKong"},
+                {3, "josh", 32, "Beijing"},
+                {4, "peter", 35, "Shanghai"},
+                {5, "peter", 26, "Wu,han"}
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockData(keys, objects, topicName);
+    }
+
+    private static void mockVertexSoftwareData() throws 
JsonProcessingException {
+        String topicName = "vertex-software";
+        String[] keys = {"id", "name", "lang", "price"};
+        Object[][] objects = {
+                {100, "lop", "java", 328.00},
+                {200, "ripple", "java", 199.67}
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockData(keys, objects, topicName);
+    }
+
+    private static void mockEdgeKnowsData() throws JsonProcessingException {
+        String topicName = "edge-knows";
+        String[] keys = {"id", "source_id", "target_id", "date", "weight"};
+        Object[][] objects = {
+                {1, 1, 2, "2016-01-10", 0.50},
+                {2, 1, 3, "2013-02-20", 1.00}
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockData(keys, objects, topicName);
+    }
+
+    private static void mockEdgeCreatedData() throws JsonProcessingException {
+        String topicName = "edge-created";
+        String[] keys = {"id", "source_id", "target_id", "date", "weight"};
+        Object[][] objects = {
+                {1, 1, 100, "2017-12-10", 0.40},
+                {2, 3, 100, "2009-11-11", 0.40},
+                {3, 3, 200, "2017-12-10", 1.00},
+                {4, 4, 100, "2017-03-24", 0.20}
+        };
+        KafkaUtil.createTopic(topicName);
+        commonMockData(keys, objects, topicName);
+    }
+
+    @NotNull
+    private static Producer<String, String> createKafkaProducer() {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", KafkaUtil.getBootStrapServers());
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        Producer<String, String> producer = new KafkaProducer<>(props);
+        return producer;
+    }
+
+    private static void commonMockData(String[] keys, Object[][] objects, 
String topic)
+            throws JsonProcessingException {
+
+        Producer<String, String> producer = createKafkaProducer();
+
+        for (Object[] object : objects) {
+            Map<String, Object> map = new HashMap<>();
+            for (int i = 0; i < keys.length; i++) {
+                map.put(keys[i], object[i]);
+            }
+            ObjectMapper objectMapper = new ObjectMapper();
+            String value = objectMapper.writeValueAsString(map);
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(topic, value);
+            producer.send(record);
+        }
+
+        producer.flush();
+        producer.close();
+    }
+
+    private static void commonMockTextData(Object[] objects, String topicName) 
{
+        Producer<String, String> producer = createKafkaProducer();
+
+        for (Object object : objects) {
+            ProducerRecord<String, String> record =
+                    new ProducerRecord<>(topicName, object.toString());
+            producer.send(record);
+        }
+
+        producer.flush();
+        producer.close();
+    }
+}
diff --git 
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaUtil.java
 
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaUtil.java
new file mode 100644
index 00000000..d8f18626
--- /dev/null
+++ 
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/KafkaUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.test.functional;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+public class KafkaUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);
+
+    private static final DockerImageName KAFKA_DOCKER_IMAGE =
+            DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
+
+    private static KafkaContainer kafkaContainer;
+
+    private static String bootStrapServers;
+
+    private static AdminClient adminClient;
+
+    public static void prepareEnv() {
+        startService();
+        createAdminClient();
+    }
+
+    public static void close() {
+        adminClient.close();
+        kafkaContainer.close();
+    }
+
+    private static void startService() {
+        kafkaContainer = new KafkaContainer(KAFKA_DOCKER_IMAGE);
+        kafkaContainer.setPortBindings(Arrays.asList("9092:9092", 
"9093:9093"));
+        kafkaContainer.start();
+        kafkaContainer.waitingFor(Wait.defaultWaitStrategy());
+        bootStrapServers = kafkaContainer.getBootstrapServers();
+        LOG.info("Kafka Container run successfully, bootstrapServer: {}.", 
bootStrapServers);
+    }
+
+    private static void createAdminClient() {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootStrapServers);
+        adminClient = AdminClient.create(properties);
+    }
+
+    public static void createTopic(String topicName) {
+        int numPartitions = 1;
+        int replicationFactor = 1;
+        NewTopic newTopic =
+                new NewTopic(topicName, numPartitions, (short) 
replicationFactor);
+        adminClient.createTopics(Collections.singleton(newTopic));
+    }
+
+    public static String getBootStrapServers() {
+        return bootStrapServers;
+    }
+
+    public static AdminClient getAdminClient() {
+        return adminClient;
+    }
+}
diff --git 
a/hugegraph-loader/src/test/resources/kafka_customized_schema/schema.groovy 
b/hugegraph-loader/src/test/resources/kafka_customized_schema/schema.groovy
new file mode 100644
index 00000000..ed703574
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_customized_schema/schema.groovy
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+schema.propertyKey("feel").asText().valueList().ifNotExist().create();
+schema.propertyKey("time").asText().valueSet().ifNotExist().create();
+
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age", 
"city").nullableKeys("age", "city").ifNotExist().create();
+schema.vertexLabel("software").useCustomizeNumberId().properties("name", 
"lang", "price").ifNotExist().create();
+
+schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date",
 "weight").ifNotExist().create();
+schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date",
 "weight").ifNotExist().create();
+schema.edgeLabel("use").sourceLabel("person").targetLabel("software").properties("feel",
 "time").nullableKeys("feel", "time").ifNotExist().create();
diff --git 
a/hugegraph-loader/src/test/resources/kafka_customized_schema/struct.json 
b/hugegraph-loader/src/test/resources/kafka_customized_schema/struct.json
new file mode 100644
index 00000000..6e30520e
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_customized_schema/struct.json
@@ -0,0 +1,89 @@
+{
+  "version": "2.0",
+  "structs": [
+    {
+      "id": "1",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-person",
+        "from_beginning": true,
+        "group": "consumer_customize_schema",
+        "format": "JSON",
+        "early_stop": true
+      },
+      "vertices": [
+        {
+          "label": "person",
+          "id": "id",
+          "null_values": ["NULL"]
+        }
+      ],
+      "edges": []
+    },
+    {
+      "id": "2",
+      "input": {
+        "type": "KAFKA",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-software",
+        "from_beginning": true,
+        "group": "consumer_customize_schema",
+        "format": "JSON",
+        "early_stop": true,
+        "batch_size": 2
+      },
+      "vertices": [
+        {
+          "label": "software",
+          "id": "id"
+        }
+      ],
+      "edges": []
+    },
+    {
+      "id": "3",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "edge-knows",
+        "from_beginning": true,
+        "group": "consumer_customize_schema",
+        "format": "JSON",
+        "early_stop": true,
+        "batch_size": 2
+      },
+      "vertices": [],
+      "edges": [
+        {
+          "label": "knows",
+          "source": ["source_id"],
+          "target": ["target_id"],
+          "ignored": ["id"]
+        }
+      ]
+    },
+    {
+      "id": "4",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "edge-created",
+        "from_beginning": true,
+        "group": "consumer_customize_schema",
+        "format": "JSON",
+        "early_stop": true,
+        "batch_size": 2
+      },
+      "vertices": [],
+      "edges": [
+        {
+          "label": "created",
+          "source": ["source_id"],
+          "target": ["target_id"],
+          "ignored": ["id"]
+        }
+      ]
+    }
+  ]
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
 b/hugegraph-loader/src/test/resources/kafka_format_csv/schema.groovy
similarity index 70%
copy from 
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_format_csv/schema.groovy
index ed54933f..305adf46 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_format_csv/schema.groovy
@@ -14,14 +14,9 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
 
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
-    FILE,
-
-    HDFS,
-
-    JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age", 
"city").nullableKeys("age", "city").ifNotExist().create();
diff --git a/hugegraph-loader/src/test/resources/kafka_format_csv/struct.json 
b/hugegraph-loader/src/test/resources/kafka_format_csv/struct.json
new file mode 100644
index 00000000..963a3409
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_format_csv/struct.json
@@ -0,0 +1,26 @@
+{
+  "version": "2.0",
+  "structs": [
+    {
+      "id": "1",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-format-csv",
+        "from_beginning": true,
+        "group": "consumer_format_csv",
+        "format": "CSV",
+        "header": ["id", "name", "age", "city"],
+        "early_stop": true
+      },
+      "vertices": [
+        {
+          "label": "person",
+          "id": "id",
+          "null_values": ["NULL"]
+        }
+      ],
+      "edges": []
+    }
+  ]
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
 b/hugegraph-loader/src/test/resources/kafka_format_not_support/schema.groovy
similarity index 56%
copy from 
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to 
hugegraph-loader/src/test/resources/kafka_format_not_support/schema.groovy
index ed54933f..784bda54 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_format_not_support/schema.groovy
@@ -14,14 +14,12 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asText().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("price").asText().ifNotExist().create();
 
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
-    FILE,
-
-    HDFS,
-
-    JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age", 
"city").nullableKeys("name", "age", "city").ifNotExist().create();
+schema.vertexLabel("software").useCustomizeNumberId().properties("name", 
"lang", "price").nullableKeys("name", "lang", "price").ifNotExist().create();
diff --git 
a/hugegraph-loader/src/test/resources/kafka_format_not_support/struct.json 
b/hugegraph-loader/src/test/resources/kafka_format_not_support/struct.json
new file mode 100644
index 00000000..75526ed8
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_format_not_support/struct.json
@@ -0,0 +1,45 @@
+{
+  "version": "2.0",
+  "structs": [
+    {
+      "id": "1",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-person",
+        "from_beginning": true,
+        "group": "consumer_num_to_str",
+        "format": "EXCEL",
+        "early_stop": true
+      },
+      "vertices": [
+        {
+          "label": "person",
+          "id": "id",
+          "null_values": ["NULL"]
+        }
+      ],
+      "edges": []
+    },
+    {
+      "id": "2",
+      "input": {
+        "type": "KAFKA",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-software",
+        "from_beginning": true,
+        "group": "consumer_num_to_str",
+        "format": "JSON",
+        "early_stop": true,
+        "batch_size": 2
+      },
+      "vertices": [
+        {
+          "label": "software",
+          "id": "id"
+        }
+      ],
+      "edges": []
+    }
+  ]
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
 b/hugegraph-loader/src/test/resources/kafka_format_text/schema.groovy
similarity index 70%
copy from 
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_format_text/schema.groovy
index ed54933f..305adf46 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_format_text/schema.groovy
@@ -14,14 +14,9 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
 
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
-    FILE,
-
-    HDFS,
-
-    JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age", 
"city").nullableKeys("age", "city").ifNotExist().create();
diff --git a/hugegraph-loader/src/test/resources/kafka_format_text/struct.json 
b/hugegraph-loader/src/test/resources/kafka_format_text/struct.json
new file mode 100644
index 00000000..95811bb2
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_format_text/struct.json
@@ -0,0 +1,27 @@
+{
+  "version": "2.0",
+  "structs": [
+    {
+      "id": "1",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-format-text",
+        "from_beginning": true,
+        "group": "consumer_format_text",
+        "format": "TEXT",
+        "header": ["id", "name", "age", "city"],
+        "delimiter": "\t",
+        "early_stop": true
+      },
+      "vertices": [
+        {
+          "label": "person",
+          "id": "id",
+          "null_values": ["NULL"]
+        }
+      ],
+      "edges": []
+    }
+  ]
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
 b/hugegraph-loader/src/test/resources/kafka_number_to_string/schema.groovy
similarity index 56%
copy from 
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_number_to_string/schema.groovy
index ed54933f..784bda54 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_number_to_string/schema.groovy
@@ -14,14 +14,12 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asText().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("price").asText().ifNotExist().create();
 
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
-    FILE,
-
-    HDFS,
-
-    JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age", 
"city").nullableKeys("name", "age", "city").ifNotExist().create();
+schema.vertexLabel("software").useCustomizeNumberId().properties("name", 
"lang", "price").nullableKeys("name", "lang", "price").ifNotExist().create();
diff --git 
a/hugegraph-loader/src/test/resources/kafka_number_to_string/struct.json 
b/hugegraph-loader/src/test/resources/kafka_number_to_string/struct.json
new file mode 100644
index 00000000..9ead6a3b
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_number_to_string/struct.json
@@ -0,0 +1,45 @@
+{
+  "version": "2.0",
+  "structs": [
+    {
+      "id": "1",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-person",
+        "from_beginning": true,
+        "group": "consumer_num_to_str",
+        "format": "JSON",
+        "early_stop": true
+      },
+      "vertices": [
+        {
+          "label": "person",
+          "id": "id",
+          "null_values": ["NULL"]
+        }
+      ],
+      "edges": []
+    },
+    {
+      "id": "2",
+      "input": {
+        "type": "KAFKA",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-software",
+        "from_beginning": true,
+        "group": "consumer_num_to_str",
+        "format": "JSON",
+        "early_stop": true,
+        "batch_size": 2
+      },
+      "vertices": [
+        {
+          "label": "software",
+          "id": "id"
+        }
+      ],
+      "edges": []
+    }
+  ]
+}
diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
 b/hugegraph-loader/src/test/resources/kafka_value_mapping/schema.groovy
similarity index 70%
copy from 
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
copy to hugegraph-loader/src/test/resources/kafka_value_mapping/schema.groovy
index ed54933f..305adf46 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/source/SourceType.java
+++ b/hugegraph-loader/src/test/resources/kafka_value_mapping/schema.groovy
@@ -14,14 +14,9 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
 
-package org.apache.hugegraph.loader.source;
-
-public enum SourceType {
-
-    FILE,
-
-    HDFS,
-
-    JDBC
-}
+schema.vertexLabel("person").useCustomizeNumberId().properties("name", "age", 
"city").nullableKeys("age", "city").ifNotExist().create();
diff --git 
a/hugegraph-loader/src/test/resources/kafka_value_mapping/struct.json 
b/hugegraph-loader/src/test/resources/kafka_value_mapping/struct.json
new file mode 100644
index 00000000..332dbe64
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/kafka_value_mapping/struct.json
@@ -0,0 +1,31 @@
+{
+  "version": "2.0",
+  "structs": [
+    {
+      "id": "1",
+      "input": {
+        "type": "kafka",
+        "bootstrap_server": "localhost:9093",
+        "topic": "vertex-person-value-mapping",
+        "from_beginning": true,
+        "group": "consumer_value_mapping",
+        "format": "JSON",
+        "early_stop": true
+      },
+      "vertices": [
+        {
+          "label": "person",
+          "id": "id",
+          "null_values": ["NULL"],
+          "value_mapping": {
+            "city": {
+              "1": "Beijing",
+              "2": "Shanghai"
+            }
+          }
+        }
+      ],
+      "edges": []
+    }
+  ]
+}

Reply via email to