Alberne opened a new issue, #9474:
URL: https://github.com/apache/seatunnel/issues/9474

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   the  Http source plugin currently  supports Page Number Pagination& 
Cursor-Based Pagination.   In the following code, an NPE (NullPointerException) 
will occur when the pageField is null, The specific code snippet is: 
map.containsKey(pageField).
   
   
   ```
       private void processPageMap(
               Map<String, String> map,
               String pageField,
               String pageValue,
               boolean usePlaceholderReplacement) {
           if (usePlaceholderReplacement) {
               // Placeholder replacement
               Map<String, String> updatedMap = new HashMap<>();
               for (Map.Entry<String, String> entry : map.entrySet()) {
                   String key = entry.getKey();
                   String value = entry.getValue();
                   String replacedValue = replacePlaceholder(value, pageField, 
pageValue);
                   if (replacedValue != null) {
                       updatedMap.put(key, replacedValue);
                   }
               }
               map.putAll(updatedMap);
           //  NPE (NullPointerException) will occur when the pageField is null,
           } else if (map.containsKey(pageField)) {
               // Key-based replacement
               map.put(pageField, pageValue);
           }
       }
   ```
   
   the processPageMap function is called in the following scenarios,  only in 
updateRequestParam function :
   ```
   
   // Process headers
           if (MapUtils.isNotEmpty(this.httpParameter.getHeaders())) {
               processPageMap(
                       this.httpParameter.getHeaders(),
                       pageField,
                       pageValue.toString(),
                       usePlaceholderReplacement);
   
               processPageMap(
                       this.httpParameter.getHeaders(),
                       pageInfo.getPageCursorFieldName(),
                       pageInfo.getCursor(),
                       usePlaceholderReplacement);
           }
           // if not set keepPageParamAsHttpParam, but page field is in params, 
then set page index as
           if (MapUtils.isNotEmpty(this.httpParameter.getParams())) {
   
               processPageMap(
                       this.httpParameter.getParams(),
                       pageField,
                       pageValue.toString(),
                       usePlaceholderReplacement);
               processPageMap(
                       this.httpParameter.getParams(),
                       pageInfo.getPageCursorFieldName(),
                       pageInfo.getCursor(),
                       usePlaceholderReplacement);
           }
   ```
   
![Image](https://github.com/user-attachments/assets/12e45b95-d56b-4c96-bf0d-587c9c771f03)
   
   ### SeaTunnel Version
   
   2.3.11
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     Http {
       plugin_output = "http"
       url = "http://127.0.0.1:5000/api/cursor_data";
       method = "GET"
   
       format = "json"
        content_field = "$.data.*"
        keep_page_param_as_http_param = true
        pageing = {
                   page_type="Cursor"
                    cursor_field ="cursor"
                   cursor_response_field="$.paging.cursors.next"
        }
   
         schema = {
            fields {
                    content=string
               id=int
               name=string
   
   
                }
         }
   
           json_field = {
               content = "$.data[*].content"
               id = "$.data[*].id"
               name = "$.data[*].name"
   
             }
       }
     }
   
   
   # Console printing of the read Http data
   sink {
     Console {
       parallelism = 1
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   Assuming config file is `http_paging.config`
   
   ${SEATUNNEL_HOME}/bin/seatunnel.sh --deploy-mode local -c http_paging.config
   ```
   
   ### Error Exception
   
   ```log
   The updateRequestParam function, when called within a try...finally block’s 
exception handling logic, does not throw explicit exceptions.
   
   
    @Override
       public void internalPollNext(Collector<SeaTunnelRow> output) throws 
Exception {
           try {
               if (pageInfoOptional.isPresent()) {
                   noMoreElementFlag = false;
                   PageInfo info = pageInfoOptional.get();
                   // cursor pagination
                   if 
(HttpPaginationType.CURSOR.getCode().equals(info.getPageType())) {
                       while (!noMoreElementFlag) {
                           updateRequestParam(info, 
info.isUsePlaceholderReplacement());
                           pollAndCollectData(output);
                           Thread.sleep(10);
                       }
   
                   } else {
                       // default page number pagination
                       Long pageIndex = info.getPageIndex();
                       while (!noMoreElementFlag) {
                           // increment page
                           info.setPageIndex(pageIndex);
                           // set request param
                           updateRequestParam(info, 
info.isUsePlaceholderReplacement());
                           pollAndCollectData(output);
                           pageIndex += 1;
                           Thread.sleep(10);
                       }
                   }
               } else {
                   pollAndCollectData(output);
               }
           } finally {
               if (Boundedness.BOUNDED.equals(context.getBoundedness()) && 
noMoreElementFlag) {
                   // signal to the source that we have reached the end of the 
data.
                   log.info("Closed the bounded http source");
                   context.signalNoMoreElement();
               } else {
                   if (httpParameter.getPollIntervalMillis() > 0) {
                       Thread.sleep(httpParameter.getPollIntervalMillis());
                   }
               }
           }
       }
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to