This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bb180b2988 [Feature][Connector-V2] HTTP supports page increase #5477
(#5561)
bb180b2988 is described below
commit bb180b298846a0f7214d07ee9bcb66a900e368f5
Author: xiaofan2012 <[email protected]>
AuthorDate: Thu Oct 26 17:47:05 2023 +0800
[Feature][Connector-V2] HTTP supports page increase #5477 (#5561)
---------
Co-authored-by: xiaofan2022 <[email protected]>
Co-authored-by: Eric <[email protected]>
---
docs/en/connector-v2/source/Http.md | 33 +++++
.../seatunnel/http/config/HttpConfig.java | 19 +++
.../connectors/seatunnel/http/config/PageInfo.java | 35 +++++
.../seatunnel/http/source/HttpSource.java | 30 ++++-
.../seatunnel/http/source/HttpSourceReader.java | 116 +++++++++++++----
.../seatunnel/e2e/connector/http/HttpIT.java | 7 +
.../resources/http_page_increase_no_page_num.conf | 85 ++++++++++++
.../resources/http_page_increase_page_num.conf | 85 ++++++++++++
.../src/test/resources/mockserver-config.json | 144 +++++++++++++++++++++
9 files changed, 529 insertions(+), 25 deletions(-)
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index f3e6a221bb..0c01be813e 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -48,6 +48,10 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| schema | Config | No | - | Http and
seatunnel data structure mapping
|
| schema.fields | Config | No | - | The schema
fields of upstream data
|
| json_field | Config | No | - | This parameter
helps you configure the schema,so this parameter must be used with schema.
|
+| pageing | Config | No | - | This parameter
is used for paging queries
|
+| pageing.page_field | String | No | - | This parameter
is used to specify the page field name in the request parameter
|
+| pageing.total_page_size | Int | No | - | This parameter
is used to control the total number of pages
|
+| pageing.batch_size | Int | No | - | The batch size
returned per request is used to determine whether to continue when the total
number of pages is unknown |
| content_json | String | No | - | This parameter
can get some json data.If you only need the data in the 'book' section,
configure `content_field = "$.store.book.*"`. |
| format | String | No | json | The format of
upstream data, now only support `json` `text`, default `json`.
|
| method | String | No | get | Http request
method, only supports GET, POST method.
|
@@ -310,6 +314,35 @@ source {
- Test data can be found at this link
[mockserver-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json)
- See this link for task configuration
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
+### pageing
+
+```hocon
+source {
+ Http {
+ url = "http://localhost:8080/mock/queryData"
+ method = "GET"
+ format = "json"
+ params={
+ page: "${page}"
+ }
+ content_field = "$.data.*"
+ pageing={
+ total_page_size=20
+ page_field=page
+ #when don't know the total_page_size use batch_size if read
size<batch_size finish ,otherwise continue
+ #batch_size=10
+ }
+ schema = {
+ fields {
+ name = string
+ age = string
+ }
+ }
+ }
+}
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index d57f7ad8d3..2a68249b86 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -29,6 +29,25 @@ public class HttpConfig {
public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;
public static final Option<String> URL =
Options.key("url").stringType().noDefaultValue().withDescription("Http request
url");
+ public static final Option<Long> TOTAL_PAGE_SIZE =
+ Options.key("total_page_size")
+ .longType()
+ .defaultValue(0L)
+ .withDescription("total page size");
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "the batch size returned per request is used to
determine whether to continue when the total number of pages is unknown");
+ public static final Option<String> PAGE_FIELD =
+ Options.key("page_field")
+ .stringType()
+ .defaultValue("page")
+ .withDescription(
+ "this parameter is used to specify the page field
name in the request parameter");
+ public static final Option<Map<String, String>> PAGEING =
+
Options.key("pageing").mapType().noDefaultValue().withDescription("pageing");
public static final Option<HttpRequestMethod> METHOD =
Options.key("method")
.enumType(HttpRequestMethod.class)
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
new file mode 100644
index 0000000000..a5c7061347
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.http.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+@Setter
+@Getter
+@ToString
+public class PageInfo implements Serializable {
+
+ private Long totalPageSize;
+
+ private Integer batchSize;
+ private String pageField;
+ private Long pageIndex;
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 8e8311b650..e0a30ad061 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -43,6 +43,7 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -53,6 +54,7 @@ import java.util.Locale;
@AutoService(SeaTunnelSource.class)
public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
protected final HttpParameter httpParameter = new HttpParameter();
+ protected PageInfo pageInfo;
protected SeaTunnelRowType rowType;
protected JsonField jsonField;
protected String contentField;
@@ -83,6 +85,31 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
this.httpParameter.buildWithConfig(pluginConfig);
buildSchemaWithConfig(pluginConfig);
+ buildPagingWithConfig(pluginConfig);
+ }
+
+ private void buildPagingWithConfig(Config pluginConfig) {
+ if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) {
+ pageInfo = new PageInfo();
+ Config pageConfig =
pluginConfig.getConfig(HttpConfig.PAGEING.key());
+ if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) {
+
pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key()));
+ }
+ if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) {
+
pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key()));
+ } else {
+
pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue());
+ }
+
+ if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) {
+
pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key()));
+ } else {
+ pageInfo.setBatchSize(HttpConfig.BATCH_SIZE.defaultValue());
+ }
+ if (pageConfig.hasPath(HttpConfig.PAGE_FIELD.key())) {
+
pageInfo.setPageField(pageConfig.getString(HttpConfig.PAGE_FIELD.key()));
+ }
+ }
}
protected void buildSchemaWithConfig(Config pluginConfig) {
@@ -141,7 +168,8 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
readerContext,
this.deserializationSchema,
jsonField,
- contentField);
+ contentField,
+ pageInfo);
}
private JsonField getJsonField(Config jsonFieldConf) {
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 1b9969a237..3c4669659f 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
@@ -36,6 +37,7 @@ import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ReadContext;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
@@ -46,8 +48,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
@Slf4j
+@Setter
public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
protected final SingleSplitReaderContext context;
protected final HttpParameter httpParameter;
@@ -61,6 +65,8 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
private final String contentJson;
private final Configuration jsonConfiguration =
Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
+ private boolean noMoreElementFlag = true;
+ private Optional<PageInfo> pageInfoOptional = Optional.empty();
public HttpSourceReader(
HttpParameter httpParameter,
@@ -75,6 +81,21 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
this.contentJson = contentJson;
}
+ public HttpSourceReader(
+ HttpParameter httpParameter,
+ SingleSplitReaderContext context,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ JsonField jsonField,
+ String contentJson,
+ PageInfo pageInfo) {
+ this.context = context;
+ this.httpParameter = httpParameter;
+ this.deserializationCollector = new
DeserializationCollector(deserializationSchema);
+ this.jsonField = jsonField;
+ this.contentJson = contentJson;
+ this.pageInfoOptional = Optional.ofNullable(pageInfo);
+ }
+
@Override
public void open() {
httpClient = new HttpClientProvider(httpParameter);
@@ -87,40 +108,72 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
}
}
- @Override
- public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- try {
- HttpResponse response =
- httpClient.execute(
- this.httpParameter.getUrl(),
- this.httpParameter.getMethod().getMethod(),
- this.httpParameter.getHeaders(),
- this.httpParameter.getParams(),
- this.httpParameter.getBody());
- if (HttpResponse.STATUS_OK == response.getCode()) {
- String content = response.getContent();
- if (!Strings.isNullOrEmpty(content)) {
- if (this.httpParameter.isEnableMultilines()) {
- StringReader stringReader = new StringReader(content);
- BufferedReader bufferedReader = new
BufferedReader(stringReader);
- String lineStr;
- while ((lineStr = bufferedReader.readLine()) != null) {
- collect(output, lineStr);
- }
- } else {
- collect(output, content);
+ public void pollAndCollectData(Collector<SeaTunnelRow> output) throws
Exception {
+ HttpResponse response =
+ httpClient.execute(
+ this.httpParameter.getUrl(),
+ this.httpParameter.getMethod().getMethod(),
+ this.httpParameter.getHeaders(),
+ this.httpParameter.getParams(),
+ this.httpParameter.getBody());
+ if (HttpResponse.STATUS_OK == response.getCode()) {
+ String content = response.getContent();
+ if (!Strings.isNullOrEmpty(content)) {
+ if (this.httpParameter.isEnableMultilines()) {
+ StringReader stringReader = new StringReader(content);
+ BufferedReader bufferedReader = new
BufferedReader(stringReader);
+ String lineStr;
+ while ((lineStr = bufferedReader.readLine()) != null) {
+ collect(output, lineStr);
}
+ } else {
+ collect(output, content);
}
- return;
}
+ log.info(
+ "http client execute success request param:[{}], http
response status code:[{}], content:[{}]",
+ httpParameter.getParams(),
+ response.getCode(),
+ response.getContent());
+ } else {
log.error(
"http client execute exception, http response status
code:[{}], content:[{}]",
response.getCode(),
response.getContent());
+ }
+ }
+
+ private void updateRequestParam(PageInfo pageInfo) {
+ if (this.httpParameter.getParams() == null) {
+ httpParameter.setParams(new HashMap<>());
+ }
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ try {
+ if (pageInfoOptional.isPresent()) {
+ noMoreElementFlag = false;
+ Long pageIndex = 1L;
+ while (!noMoreElementFlag) {
+ PageInfo info = pageInfoOptional.get();
+ // increment page
+ info.setPageIndex(pageIndex);
+ // set request param
+ updateRequestParam(info);
+ pollAndCollectData(output);
+ pageIndex += 1;
+ }
+ } else {
+ pollAndCollectData(output);
+ }
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
- if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
noMoreElementFlag) {
// signal to the source that we have reached the end of the
data.
log.info("Closed the bounded http source");
context.signalNoMoreElement();
@@ -140,6 +193,21 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
this.initJsonPath(jsonField);
data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data),
jsonField)).toString();
}
+ // page increase
+ if (pageInfoOptional.isPresent()) {
+ // Determine whether the task is completed by specifying the
presence of the 'total
+ // page' field
+ PageInfo pageInfo = pageInfoOptional.get();
+ if (pageInfo.getTotalPageSize() > 0) {
+ noMoreElementFlag = pageInfo.getPageIndex() >=
pageInfo.getTotalPageSize();
+ } else {
+ // no 'total page' configured
+ int readSize = JsonUtils.stringToJsonNode(data).size();
+ // if read size < BatchSize : read finish
+ // if read size = BatchSize : read next page.
+ noMoreElementFlag = readSize < pageInfo.getBatchSize();
+ }
+ }
deserializationCollector.collect(data.getBytes(), output);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index d65617bb55..bd85ed876e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -145,6 +145,13 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
Container.ExecResult execResult14 =
container.executeJob("/http_jsonrequestbody_to_assert.conf");
Assertions.assertEquals(0, execResult14.getExitCode());
+
+ Container.ExecResult execResult15 =
+ container.executeJob("/http_page_increase_page_num.conf");
+ Assertions.assertEquals(0, execResult15.getExitCode());
+ Container.ExecResult execResult16 =
+ container.executeJob("/http_page_increase_no_page_num.conf");
+ Assertions.assertEquals(0, execResult16.getExitCode());
}
public String getMockServerConfig() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
new file mode 100644
index 0000000000..387201b379
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Http {
+ result_table_name = "http"
+ url = "http://mockserver:1080/query/pagesNoPageNum"
+ method = "GET"
+ format = "json"
+ json_field = {
+ name = "$.data[*].name"
+ age = "$.data[*].age"
+ }
+ pageing = {
+ batch_size=10
+ page_field = page
+ }
+ schema = {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "http"
+ }
+ Assert {
+ source_table_name = "http"
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 12
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 12
+ }
+ ]
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
new file mode 100644
index 0000000000..c3202d6f7b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Http {
+ result_table_name = "http"
+ url = "http://mockserver:1080/query/pages"
+ method = "GET"
+ format = "json"
+ json_field = {
+ name = "$.data[*].name"
+ age = "$.data[*].age"
+ }
+ pageing = {
+ total_page_size = 2
+ page_field = page
+ }
+ schema = {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "http"
+ }
+ Assert {
+ source_table_name = "http"
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 4
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 4
+ }
+ ]
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 2c419277e0..9cb561225d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4526,5 +4526,149 @@
"Content-Type": "application/json"
}
}
+ },
+ {
+ "httpRequest": {
+ "method" : "GET",
+ "path": "/query/pages",
+ "queryStringParameters": {
+ "page": "1"
+ }
+ },
+ "httpResponse": {
+ "body":
+ {
+ "status": null,
+ "msg": null,
+ "data": [
+ {
+ "name": "name1",
+ "age": 69
+ },
+ {
+ "name": "name2",
+ "age": 51
+ }
+ ],
+ "currentPageIndex": 1,
+ "totalPage": 2
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method" : "GET",
+ "path": "/query/pages",
+ "queryStringParameters": {
+ "page": "2"
+ }
+ },
+ "httpResponse": {
+ "body":
+ {
+ "status": null,
+ "msg": null,
+ "data": [
+ {
+ "name": "name1",
+ "age": 69
+ },
+ {
+ "name": "name2",
+ "age": 51
+ }
+ ],
+ "currentPageIndex": 2,
+ "totalPage": 2
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method" : "GET",
+ "path": "/query/pagesNoPageNum",
+ "queryStringParameters": {
+ "page": "1"
+ }
+ },
+ "httpResponse": {
+ "body":
+ {
+ "status": null,
+ "msg": null,
+ "data": [
+ {
+ "name": "name1",
+ "age": 69
+ },
+ {
+ "name": "name2",
+ "age": 51
+ },
+ {
+ "name": "name3",
+ "age": 36
+ },
+ {
+ "name": "name4",
+ "age": 51
+ },
+ {
+ "name": "name5",
+ "age": 74
+ },
+ {
+ "name": "name6",
+ "age": 51
+ },
+ {
+ "name": "name7",
+ "age": 67
+ },
+ {
+ "name": "name8",
+ "age": 12
+ },
+ {
+ "name": "name9",
+ "age": 45
+ },
+ {
+ "name": "name10",
+ "age": 23
+ }
+ ],
+ "currentPageIndex": 1,
+ "hasNext": true
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method" : "GET",
+ "path": "/query/pagesNoPageNum",
+ "queryStringParameters": {
+ "page": "2"
+ }
+ },
+ "httpResponse": {
+ "body":
+ {
+ "status": null,
+ "msg": null,
+ "data": [
+ {
+ "name": "name11",
+ "age": 69
+ },
+ {
+ "name": "name22",
+ "age": 51
+ }
+ ],
+ "currentPageIndex": 2,
+ "hasNext": false
+ }
+ }
}
]