This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1807eb6c9 [Feature][Connector-V2][HTTP] Use json-path parsing (#3510)
1807eb6c9 is described below

commit 1807eb6c9587f5342583c885a3a22d2e4d987d40
Author: liugddx <[email protected]>
AuthorDate: Wed Nov 30 18:57:02 2022 +0800

    [Feature][Connector-V2][HTTP] Use json-path parsing (#3510)
    
    * add json-path dependency
    
    * add json path parsing
    
    * add e2e test
    
    * add e2e test
    
    * modify md
    
    * modify md
    
    * fix cv.
    
    * remove unnecessary code
    
    * fix ci error
    
    * add content json
    
    * fix some bug
    
    * fix md
    
    * resolve code conflicts
    
    * fix md
    
    * fix ci error
    
    * modify http.md
    
    * Resolve conflict
    
    * fix cr problem
    
    * Update Http.md
    
    add content_json
---
 docs/en/connector-v2/source/Http.md                | 184 +++++++++++++++++++--
 .../connector-http/connector-http-base/pom.xml     |  10 +-
 .../seatunnel/http/config/HttpConfig.java          |   9 +
 .../seatunnel/http/config/JsonField.java           |  36 ++++
 .../http/exception/HttpConnectorErrorCode.java     |  43 +++++
 .../seatunnel/http/source/HttpSource.java          |  22 ++-
 .../seatunnel/http/source/HttpSourceReader.java    | 108 +++++++++++-
 .../seatunnel/gitlab/source/GitlabSource.java      |   2 +-
 .../seatunnel/jira/source/JiraSource.java          |   2 +-
 .../seatunnel/klaviyo/source/KlaviyoSource.java    |   2 +-
 .../seatunnel/lemlist/source/LemlistSource.java    |   2 +-
 .../seatunnel/myhours/source/MyHoursSource.java    |   2 +-
 .../onesignal/source/OneSignalSource.java          |   2 +-
 .../e2e/connector/http/HttpContentJsonIT.java      |  76 +++++++++
 .../e2e/connector/http/HttpJsonPathIT.java         |  76 +++++++++
 .../test/resources/http_contentjson_to_assert.conf |  75 +++++++++
 .../test/resources/http_jsonpath_to_assert.conf    |  80 +++++++++
 .../resources/mockserver-contentjson-config.json   |  52 ++++++
 .../test/resources/mockserver-jsonpath-config.json |  52 ++++++
 19 files changed, 806 insertions(+), 29 deletions(-)

diff --git a/docs/en/connector-v2/source/Http.md 
b/docs/en/connector-v2/source/Http.md
index 59505daa7..045a70693 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -15,13 +15,15 @@ Used to read data from Http.
 - [ ] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 
-##  Options
+## Options
 
 | name                        | type   | required | default value |
 | --------------------------- | ------ | -------- | ------------- |
 | url                         | String | Yes      | -             |
 | schema                      | Config | No       | -             |
 | schema.fields               | Config | No       | -             |
+| json_field                  | Config | No       | -             |
+| content_json                | String | No       | -             |
 | format                      | String | No       | json          |
 | method                      | String | No       | get           |
 | headers                     | Map    | No       | -             |
@@ -32,6 +34,7 @@ Used to read data from Http.
 | retry_backoff_multiplier_ms | int    | No       | 100           |
 | retry_backoff_max_ms        | int    | No       | 10000         |
 | common-options              |        | No       | -             |
+
 ### url [String]
 
 http request url
@@ -78,7 +81,11 @@ upstream data is the following:
 
 ```json
 
-{"code":  200, "data":  "get success", "success":  true}
+{
+  "code": 200,
+  "data": "get success",
+  "success": true
+}
 
 ```
 
@@ -87,11 +94,11 @@ you should assign schema as the following:
 ```hocon
 
 schema {
-    fields {
-        code = int
-        data = string
-        success = boolean
-    }
+  fields {
+    code = int
+    data = string
+    success = boolean
+  }
 }
 
 ```
@@ -108,7 +115,11 @@ upstream data is the following:
 
 ```json
 
-{"code":  200, "data":  "get success", "success":  true}
+{
+  "code": 200,
+  "data": "get success",
+  "success": true
+}
 
 ```
 
@@ -124,7 +135,142 @@ connector will generate data as the following:
 
 the schema fields of upstream data
 
-### common options 
+### content_json [String]
+
+This parameter can get some json data.If you only need the data in the 'book' 
section, configure `content_field = "$.store.book.*"`.
+
+If your return data looks something like this.
+
+```json
+{
+        "store": {
+          "book": [
+            {
+              "category": "reference",
+              "author": "Nigel Rees",
+              "title": "Sayings of the Century",
+              "price": 8.95
+            },
+            {
+              "category": "fiction",
+              "author": "Evelyn Waugh",
+              "title": "Sword of Honour",
+              "price": 12.99
+            }
+          ],
+          "bicycle": {
+            "color": "red",
+            "price": 19.95
+          }
+        },
+        "expensive": 10
+      }
+```
+You can configure `content_field = "$.store.book.*"` and the result returned 
looks like this:
+
+```json
+[
+            {
+              "category": "reference",
+              "author": "Nigel Rees",
+              "title": "Sayings of the Century",
+              "price": 8.95
+            },
+            {
+              "category": "fiction",
+              "author": "Evelyn Waugh",
+              "title": "Sword of Honour",
+              "price": 12.99
+            }
+          ]
+```
+Then you can get the desired result with a simpler schema,like
+
+```hocon
+Http {
+  url = "http://mockserver:1080/contentjson/mock";
+  method = "GET"
+  format = "json"
+  content_field = "$.store.book.*"
+  schema = {
+    fields {
+      category = string
+      author = string
+      title = string
+      price = string
+    }
+  }
+}
+```
+
+Here is an example:
+
+- Test data can be found at this link 
[mockserver-contentjson-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-contentjson-config.json)
+- See this link for task configuration 
[http_contentjson_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_contentjson_to_assert.conf).
+
+
+### json_field [Config]
+
+This parameter helps you configure the schema,so this parameter must be used 
with schema.
+
+If your data looks something like this:
+
+```json
+{
+  "store": {
+    "book": [
+      {
+        "category": "reference",
+        "author": "Nigel Rees",
+        "title": "Sayings of the Century",
+        "price": 8.95
+      },
+      {
+        "category": "fiction",
+        "author": "Evelyn Waugh",
+        "title": "Sword of Honour",
+        "price": 12.99
+      }
+    ],
+    "bicycle": {
+      "color": "red",
+      "price": 19.95
+    }
+  },
+  "expensive": 10
+}
+```
+
+You can get the contents of 'book' by configuring the task as follows:
+
+```hocon
+source {
+  Http {
+    url = "http://mockserver:1080/jsonpath/mock";
+    method = "GET"
+    format = "json"
+    json_field = {
+      category = "$.store.book[*].category"
+      author = "$.store.book[*].author"
+      title = "$.store.book[*].title"
+      price = "$.store.book[*].price"
+    }
+    schema = {
+      fields {
+        category = string
+        author = string
+        title = string
+        price = string
+      }
+    }
+  }
+}
+```
+
+- Test data can be found at this link 
[mockserver-jsonpath-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-jsonpath-config.json)
+- See this link for task configuration 
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
+
+### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
 
@@ -134,15 +280,15 @@ simple:
 
 ```hocon
 Http {
-    url = "https://tyrantlucifer.com/api/getDemoData";
-    schema {
-      fields {
-        code = int
-        message = string
-        data = string
-        ok = boolean
-      }
+  url = "https://tyrantlucifer.com/api/getDemoData";
+  schema {
+    fields {
+      code = int
+      message = string
+      data = string
+      ok = boolean
     }
+  }
 }
 ```
 
@@ -151,3 +297,7 @@ Http {
 ### 2.2.0-beta 2022-09-26
 
 - Add Http Source Connector
+
+### new version
+
+- [Feature][Connector-V2][HTTP] Use json-path parsing 
([3510](https://github.com/apache/incubator-seatunnel/pull/3510))
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml 
b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index 8e33fdccd..ca8b9a106 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -28,11 +28,12 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>connector-http-base</artifactId>
-    
+
     <properties>
         <httpclient.version>4.5.13</httpclient.version>
         <httpcore.version>4.4.4</httpcore.version>
         <guava-retrying.version>2.0.0</guava-retrying.version>
+        <json-path.version>2.7.0</json-path.version>
     </properties>
 
     <dependencies>
@@ -64,5 +65,10 @@
             <artifactId>guava-retrying</artifactId>
             <version>${guava-retrying.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>${json-path.version}</version>
+        </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 2a2e4058c..0e6d2d9f0 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -67,6 +67,15 @@ public class HttpConfig {
         .defaultValue(DEFAULT_RETRY_BACKOFF_MAX_MS)
         .withDescription("The maximum retry-backoff times(millis) if request 
http failed");
 
+    public static final Option<JsonField> JSON_FIELD = 
Options.key("json_field")
+        .objectType(JsonField.class)
+        .noDefaultValue()
+        .withDescription("SeaTunnel json field.When partial json data is 
required, this parameter can be configured to obtain data");
+    public static final Option<String> CONTENT_FIELD = 
Options.key("content_field")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("SeaTunnel content field.This parameter can get some 
json data, and there is no need to configure each field separately.");
+
     public enum ResponseFormat {
         JSON("json");
 
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/JsonField.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/JsonField.java
new file mode 100644
index 000000000..574ac51db
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/JsonField.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.config;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Builder
+public class JsonField implements Serializable {
+    private static final long serialVersionUID = -1L;
+
+    @OptionMark(description = "The json fields map")
+    private Map<String, String> fields;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java
new file mode 100644
index 000000000..824163915
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum HttpConnectorErrorCode implements SeaTunnelErrorCode {
+
+    FIELD_DATA_IS_INCONSISTENT("HTTP-01", "The field data is inconsistent");
+
+    private final String code;
+    private final String description;
+
+    HttpConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index c2a4da28e..c4fbab18f 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -31,16 +31,19 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -50,6 +53,8 @@ import java.util.Locale;
 public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     protected final HttpParameter httpParameter = new HttpParameter();
     protected SeaTunnelRowType rowType;
+    protected JsonField jsonField;
+    protected String contentField;
     protected JobContext jobContext;
     protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
@@ -68,8 +73,8 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL.key());
         if (!result.isSuccess()) {
             throw new 
HttpConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
         this.httpParameter.buildWithConfig(pluginConfig);
         buildSchemaWithConfig(pluginConfig);
@@ -88,6 +93,12 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             switch (format) {
                 case JSON:
                     this.deserializationSchema = new 
JsonDeserializationSchema(false, false, rowType);
+                    if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) {
+                        jsonField = 
getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key()));
+                    }
+                    if (pluginConfig.hasPath(HttpConfig.CONTENT_FIELD.key())) {
+                        contentField = 
pluginConfig.getString(HttpConfig.CONTENT_FIELD.key());
+                    }
                     break;
                 default:
                     // TODO: use format SPI
@@ -112,6 +123,11 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext)
         throws Exception {
-        return new HttpSourceReader(this.httpParameter, readerContext, 
this.deserializationSchema);
+        return new HttpSourceReader(this.httpParameter, readerContext, 
this.deserializationSchema, jsonField, contentField);
+    }
+
+    private JsonField getJsonField(Config jsonFieldConf) {
+        ConfigRenderOptions options = ConfigRenderOptions.concise();
+        return 
JsonField.builder().fields(JsonUtils.toMap(jsonFieldConf.root().render(options))).build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index c36f526fa..a08d00b05 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -21,16 +21,28 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 
 import com.google.common.base.Strings;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.ReadContext;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 @Slf4j
@@ -39,11 +51,19 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     protected final HttpParameter httpParameter;
     protected HttpClientProvider httpClient;
     private final DeserializationCollector deserializationCollector;
+    private static final Option[] DEFAULT_OPTIONS =
+        {Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, 
Option.DEFAULT_PATH_LEAF_TO_NULL};
+    private JsonPath[] jsonPaths;
+    private final JsonField jsonField;
+    private final String contentJson;
+    private final Configuration jsonConfiguration = 
Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
 
-    public HttpSourceReader(HttpParameter httpParameter, 
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> 
deserializationSchema) {
+    public HttpSourceReader(HttpParameter httpParameter, 
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> 
deserializationSchema, JsonField jsonField, String contentJson) {
         this.context = context;
         this.httpParameter = httpParameter;
         this.deserializationCollector = new 
DeserializationCollector(deserializationSchema);
+        this.jsonField = jsonField;
+        this.contentJson = contentJson;
     }
 
     @Override
@@ -65,6 +85,13 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
             if (HttpResponse.STATUS_OK == response.getCode()) {
                 String content = response.getContent();
                 if (!Strings.isNullOrEmpty(content)) {
+                    if (contentJson != null) {
+                        content = 
JsonUtils.stringToJsonNode(getPartOfJson(content)).toString();
+                    }
+                    if (jsonField != null) {
+                        this.initJsonPath(jsonField);
+                        content = 
JsonUtils.toJsonNode(parseToMap(decodeJSON(content), jsonField)).toString();
+                    }
                     deserializationCollector.collect(content.getBytes(), 
output);
                 }
                 return;
@@ -84,4 +111,83 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
             }
         }
     }
+
+    private List<Map<String, String>> parseToMap(List<List<String>> datas, 
JsonField jsonField) {
+        List<Map<String, String>> decodeDatas = new 
ArrayList<>(datas.get(0).size());
+        String[] keys = jsonField.getFields().keySet().toArray(new String[]{});
+        for (int index = 0; index < jsonField.getFields().size(); index++) {
+            int finalIndex = index;
+            datas.get(index).forEach(field -> {
+                if (decodeDatas.isEmpty() || decodeDatas.size() != 
datas.get(0).size()) {
+                    Map<String, String> decodeData = new 
HashMap<>(jsonField.getFields().size());
+                    decodeData.put(keys[finalIndex], field);
+                    decodeDatas.add(decodeData);
+                } else {
+                    decodeDatas.forEach(decodeData -> {
+                        decodeData.put(keys[finalIndex], field);
+                    });
+                }
+            });
+        }
+
+        return decodeDatas;
+    }
+
+    private List<List<String>> decodeJSON(String data) {
+        ReadContext jsonReadContext = 
JsonPath.using(jsonConfiguration).parse(data);
+        List<List<String>> results = new ArrayList<>(jsonPaths.length);
+        for (JsonPath path : jsonPaths) {
+            List<String> result = jsonReadContext.read(path);
+            results.add(result);
+        }
+        for (int i = 1; i < results.size(); i++) {
+            List<?> result0 = results.get(0);
+            List<?> result = results.get(i);
+            if (result0.size() != result.size()) {
+                throw new HttpConnectorException(
+                    HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
+                    String.format(
+                        "[%s](%d) and [%s](%d) the number of parsing records 
is inconsistent.",
+                        jsonPaths[0].getPath(), result0.size(), 
jsonPaths[i].getPath(), result.size()));
+            }
+        }
+
+        return dataFlip(results);
+    }
+
+    private String getPartOfJson(String data) {
+        ReadContext jsonReadContext = 
JsonPath.using(jsonConfiguration).parse(data);
+        return 
JsonUtils.toJsonString(jsonReadContext.read(JsonPath.compile(contentJson)));
+    }
+
+    private List<List<String>> dataFlip(List<List<String>> results) {
+
+        List<List<String>> datas = new ArrayList<>();
+        for (int i = 0; i < results.size(); i++) {
+            List<String> result = results.get(i);
+            if (i == 0) {
+                for (Object o : result) {
+                    String val = o == null ? null : o.toString();
+                    List<String> row = new ArrayList<>(jsonPaths.length);
+                    row.add(val);
+                    datas.add(row);
+                }
+            } else {
+                for (int j = 0; j < result.size(); j++) {
+                    Object o = result.get(j);
+                    String val = o == null ? null : o.toString();
+                    List<String> row = datas.get(j);
+                    row.add(val);
+                }
+            }
+        }
+        return datas;
+    }
+
+    private void initJsonPath(JsonField jsonField) {
+        jsonPaths = new JsonPath[jsonField.getFields().size()];
+        for (int index = 0; index < jsonField.getFields().keySet().size(); 
index++) {
+            jsonPaths[index] = 
JsonPath.compile(jsonField.getFields().keySet().toArray(new String[]{})[index]);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
index 70ab55a80..88e535aad 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
@@ -72,7 +72,7 @@ public class GitlabSource extends HttpSource {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.gitlabSourceParameter, readerContext, 
this.deserializationSchema);
+        return new HttpSourceReader(this.gitlabSourceParameter, readerContext, 
this.deserializationSchema, jsonField, contentField);
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
index 6d112de4c..45c8def46 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
@@ -71,6 +71,6 @@ public class JiraSource extends HttpSource {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.jiraSourceParameter, readerContext, 
this.deserializationSchema);
+        return new HttpSourceReader(this.jiraSourceParameter, readerContext, 
this.deserializationSchema, jsonField, contentField);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
index 02a7973a5..9c294004d 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
@@ -61,6 +61,6 @@ public class KlaviyoSource extends HttpSource {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.klaviyoSourceParameter, 
readerContext, this.deserializationSchema);
+        return new HttpSourceReader(this.klaviyoSourceParameter, 
readerContext, this.deserializationSchema, jsonField, contentField);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
index 53b27f870..a6dff3612 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
@@ -64,6 +64,6 @@ public class LemlistSource extends HttpSource {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.lemlistSourceParameter, 
readerContext, this.deserializationSchema);
+        return new HttpSourceReader(this.lemlistSourceParameter, 
readerContext, this.deserializationSchema, jsonField, contentField);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
index e6d46d1e6..c75c8ef03 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
@@ -72,7 +72,7 @@ public class MyHoursSource extends HttpSource {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.myHoursSourceParameter, 
readerContext, this.deserializationSchema);
+        return new HttpSourceReader(this.myHoursSourceParameter, 
readerContext, this.deserializationSchema, jsonField, contentField);
     }
 
     private String getAccessToken(Config pluginConfig){
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
index c59687d75..f5b47d487 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
@@ -56,6 +56,6 @@ public class OneSignalSource extends HttpSource {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.oneSignalSourceParameter, 
readerContext, this.deserializationSchema);
+        return new HttpSourceReader(this.oneSignalSourceParameter, 
readerContext, this.deserializationSchema, jsonField, contentField);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpContentJsonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpContentJsonIT.java
new file mode 100644
index 000000000..5fab2dd7f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpContentJsonIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.http;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class HttpContentJsonIT extends TestSuiteBase implements TestResource {
+
+    private static final String IMAGE = "mockserver/mockserver:5.14.0";
+
+    private GenericContainer<?> mockserverContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        this.mockserverContainer = new 
GenericContainer<>(DockerImageName.parse(IMAGE))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("mockserver")
+            .withExposedPorts(1080)
+            .withCopyFileToContainer(MountableFile.forHostPath(new 
File(HttpContentJsonIT.class.getResource(
+                    
"/mockserver-contentjson-config.json").getPath()).getAbsolutePath()),
+                "/tmp/mockserver-contentjson-config.json")
+            .withEnv("MOCKSERVER_INITIALIZATION_JSON_PATH", 
"/tmp/mockserver-contentjson-config.json")
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+            .waitingFor(new 
HttpWaitStrategy().forPath("/").forStatusCode(404));
+        Startables.deepStart(Stream.of(mockserverContainer)).join();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (mockserverContainer != null) {
+            mockserverContainer.stop();
+        }
+    }
+
+    @TestTemplate
+    public void testHttpContentJsonSourceToAssertSink(TestContainer container) 
throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/http_contentjson_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpJsonPathIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpJsonPathIT.java
new file mode 100644
index 000000000..704407027
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpJsonPathIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.http;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class HttpJsonPathIT extends TestSuiteBase implements TestResource {
+
+    private static final String IMAGE = "mockserver/mockserver:5.14.0";
+
+    private GenericContainer<?> mockserverContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        this.mockserverContainer = new 
GenericContainer<>(DockerImageName.parse(IMAGE))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("mockserver")
+            .withExposedPorts(1080)
+            .withCopyFileToContainer(MountableFile.forHostPath(new 
File(HttpJsonPathIT.class.getResource(
+                    
"/mockserver-jsonpath-config.json").getPath()).getAbsolutePath()),
+                "/tmp/mockserver-jsonpath-config.json")
+            .withEnv("MOCKSERVER_INITIALIZATION_JSON_PATH", 
"/tmp/mockserver-jsonpath-config.json")
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+            .waitingFor(new 
HttpWaitStrategy().forPath("/").forStatusCode(404));
+        Startables.deepStart(Stream.of(mockserverContainer)).join();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (mockserverContainer != null) {
+            mockserverContainer.stop();
+        }
+    }
+
+    @TestTemplate
+    public void testHttpJsonPathSourceToAssertSink(TestContainer container) 
throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/http_jsonpath_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_contentjson_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_contentjson_to_assert.conf
new file mode 100644
index 000000000..410df2263
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_contentjson_to_assert.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    url = "http://mockserver:1080/contentjson/mock";
+    method = "GET"
+    format = "json"
+    content_field = "$.store.book.*"
+    schema = {
+      fields {
+        category = string
+        author = string
+        title = string
+        price = string
+      }
+    }
+  }
+}
+
+sink {
+  Console {}
+  Assert {
+    rules {
+      field_rules = [
+        {
+          field_name = category
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = author
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = title
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf
new file mode 100644
index 000000000..7fd7e93ca
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    url = "http://mockserver:1080/jsonpath/mock";
+    method = "GET"
+    format = "json"
+    json_field = {
+      category = "$.store.book[*].category"
+      author = "$.store.book[*].author"
+      title = "$.store.book[*].title"
+      price = "$.store.book[*].price"
+    }
+    schema = {
+      fields {
+        category = string
+        author = string
+        title = string
+        price = string
+      }
+    }
+  }
+}
+
+sink {
+  Console {}
+  Assert {
+    rules {
+      field_rules = [
+        {
+          field_name = category
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = author
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = title
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-contentjson-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-contentjson-config.json
new file mode 100644
index 000000000..a6a9369e2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-contentjson-config.json
@@ -0,0 +1,52 @@
+// 
https://www.mock-server.com/mock_server/getting_started.html#request_matchers
+
+[
+  {
+    "httpRequest": {
+      "method" : "GET",
+      "path": "/contentjson/mock"
+    },
+    "httpResponse": {
+      "body": {
+        "store": {
+          "book": [
+            {
+              "category": "reference",
+              "author": "Nigel Rees",
+              "title": "Sayings of the Century",
+              "price": 8.95
+            },
+            {
+              "category": "fiction",
+              "author": "Evelyn Waugh",
+              "title": "Sword of Honour",
+              "price": 12.99
+            },
+            {
+              "category": "fiction",
+              "author": "Herman Melville",
+              "title": "Moby Dick",
+              "isbn": "0-553-21311-3",
+              "price": 8.99
+            },
+            {
+              "category": "fiction",
+              "author": "J. R. R. Tolkien",
+              "title": "The Lord of the Rings",
+              "isbn": "0-395-19395-8",
+              "price": 22.99
+            }
+          ],
+          "bicycle": {
+            "color": "red",
+            "price": 19.95
+          }
+        },
+        "expensive": 10
+      },
+      "headers": {
+        "Content-Type": "application/json"
+      }
+    }
+  }
+]
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-jsonpath-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-jsonpath-config.json
new file mode 100644
index 000000000..5993c03d6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-jsonpath-config.json
@@ -0,0 +1,52 @@
+// 
https://www.mock-server.com/mock_server/getting_started.html#request_matchers
+
+[
+  {
+    "httpRequest": {
+      "method" : "GET",
+      "path": "/jsonpath/mock"
+    },
+    "httpResponse": {
+      "body": {
+        "store": {
+          "book": [
+            {
+              "category": "reference",
+              "author": "Nigel Rees",
+              "title": "Sayings of the Century",
+              "price": 8.95
+            },
+            {
+              "category": "fiction",
+              "author": "Evelyn Waugh",
+              "title": "Sword of Honour",
+              "price": 12.99
+            },
+            {
+              "category": "fiction",
+              "author": "Herman Melville",
+              "title": "Moby Dick",
+              "isbn": "0-553-21311-3",
+              "price": 8.99
+            },
+            {
+              "category": "fiction",
+              "author": "J. R. R. Tolkien",
+              "title": "The Lord of the Rings",
+              "isbn": "0-395-19395-8",
+              "price": 22.99
+            }
+          ],
+          "bicycle": {
+            "color": "red",
+            "price": 19.95
+          }
+        },
+        "expensive": 10
+      },
+      "headers": {
+        "Content-Type": "application/json"
+      }
+    }
+  }
+]

Reply via email to