This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ed15f0dcf9 [Feature][Connector-V2] Suport choose the start page in 
http paging (#7180)
ed15f0dcf9 is described below

commit ed15f0dcf941b22b5ea55bb21ca2b7ceacff07dd
Author: Gxinge <[email protected]>
AuthorDate: Thu Aug 22 10:39:27 2024 +0800

    [Feature][Connector-V2] Suport choose the start page in http paging (#7180)
    
    * feature-http page specifies the home page
    
    * update
    
    * update
    
    * update
    
    ---------
    
    Co-authored-by: gaoxi <[email protected]>
    Co-authored-by: Jia Fan <[email protected]>
---
 docs/en/connector-v2/source/Http.md                |  1 +
 .../seatunnel/http/config/HttpConfig.java          |  5 ++
 .../seatunnel/http/source/HttpSource.java          |  5 ++
 .../seatunnel/http/source/HttpSourceFactory.java   |  1 +
 .../seatunnel/http/source/HttpSourceReader.java    |  4 +-
 .../seatunnel/e2e/connector/http/HttpIT.java       |  4 ++
 .../resources/http_page_increase_start_num.conf    | 83 ++++++++++++++++++++++
 7 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/source/Http.md 
b/docs/en/connector-v2/source/Http.md
index 78ceccf442..318b8cf00a 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -52,6 +52,7 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | pageing.page_field          | String  | No       | -       | This parameter 
is used to specify the page field name in the request parameter                 
                                      |
 | pageing.total_page_size     | Int     | No       | -       | This parameter 
is used to control the total number of pages                                    
                                      |
 | pageing.batch_size          | Int     | No       | -       | The batch size 
returned per request is used to determine whether to continue when the total 
number of pages is unknown               |
+| pageing.start_page_number   | Int     | No       | 1       | Specify the 
page number from which synchronization starts                                   
                                         |
 | content_json                | String  | No       | -       | This parameter 
can get some json data.If you only need the data in the 'book' section, 
configure `content_field = "$.store.book.*"`. |
 | format                      | String  | No       | text    | The format of 
upstream data, now only support `json` `text`, default `text`.                  
                                       |
 | method                      | String  | No       | get     | Http request 
method, only supports GET, POST method.                                         
                                        |
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 043f907e44..489b8d124b 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -44,6 +44,11 @@ public class HttpConfig {
                     .defaultValue(100)
                     .withDescription(
                             "the batch size returned per request is used to 
determine whether to continue when the total number of pages is unknown");
+    public static final Option<Long> START_PAGE_NUMBER =
+            Options.key("start_page_number")
+                    .longType()
+                    .defaultValue(1L)
+                    .withDescription("which page to start synchronizing from");
     public static final Option<String> PAGE_FIELD =
             Options.key("page_field")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index c41e8a9a84..69c87e4b91 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -101,6 +101,11 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             } else {
                 
pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue());
             }
+            if (pageConfig.hasPath(HttpConfig.START_PAGE_NUMBER.key())) {
+                
pageInfo.setPageIndex(pageConfig.getLong(HttpConfig.START_PAGE_NUMBER.key()));
+            } else {
+                
pageInfo.setPageIndex(HttpConfig.START_PAGE_NUMBER.defaultValue());
+            }
 
             if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) {
                 
pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key()));
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
index c0a276d723..853a0f2c69 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
@@ -60,6 +60,7 @@ public class HttpSourceFactory implements TableSourceFactory {
                 .optional(HttpConfig.PARAMS)
                 .optional(HttpConfig.FORMAT)
                 .optional(HttpConfig.BODY)
+                .optional(HttpConfig.PAGEING)
                 .optional(HttpConfig.JSON_FIELD)
                 .optional(HttpConfig.CONTENT_FIELD)
                 .conditional(
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 104d769ef5..051507e8e3 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -158,9 +158,9 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
         try {
             if (pageInfoOptional.isPresent()) {
                 noMoreElementFlag = false;
-                Long pageIndex = 1L;
+                PageInfo info = pageInfoOptional.get();
+                Long pageIndex = info.getPageIndex();
                 while (!noMoreElementFlag) {
-                    PageInfo info = pageInfoOptional.get();
                     // increment page
                     info.setPageIndex(pageIndex);
                     // set request param
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index ab8acd1f86..9e9bed031d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -229,6 +229,10 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
 
         Container.ExecResult execResult18 = 
container.executeJob("/httpnoschema_to_http.conf");
         Assertions.assertEquals(0, execResult18.getExitCode());
+
+        Container.ExecResult execResult19 =
+                container.executeJob("/http_page_increase_start_num.conf");
+        Assertions.assertEquals(0, execResult19.getExitCode());
     }
 
     @DisabledOnContainer(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
new file mode 100644
index 0000000000..60ced88b2d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    result_table_name = "http"
+    url = "http://mockserver:1080/query/pages";
+    method = "GET"
+    format = "json"
+    json_field = {
+      name = "$.data[*].name"
+      age = "$.data[*].age"
+    }
+    pageing = {
+      total_page_size = 2
+      page_field = page
+      start_page_number = 2
+    }
+    schema = {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "http"
+    rules {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        },
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        }
+      ]
+      field_rules = [
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = age
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}

Reply via email to