This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aeed71cebf [Fix][Connector-v2][Easysearch] Handle optional password in 
credentials and add clearScroll method (#10161)
aeed71cebf is described below

commit aeed71cebf227825507b912e78aa64b72e510bc7
Author: Jast <[email protected]>
AuthorDate: Wed Dec 17 20:15:13 2025 +0800

    [Fix][Connector-v2][Easysearch] Handle optional password in credentials and 
add clearScroll method (#10161)
---
 .../easysearch/client/EasysearchClient.java        | 36 +++++++++++++++++++-
 .../easysearch/source/EasysearchSourceReader.java  | 38 +++++++++++++++-------
 .../e2e/connector/easysearch/EasysearchIT.java     | 14 ++++++++
 3 files changed, 75 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
index df3153603e..e6f9ca68d5 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
@@ -158,10 +158,14 @@ public class EasysearchClient {
         restClientBuilder.setHttpClientConfigCallback(
                 httpClientBuilder -> {
                     if (username.isPresent()) {
+                        String passwordStr = null;
+                        if (password.isPresent()) {
+                            passwordStr = password.get();
+                        }
                         CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
                         credentialsProvider.setCredentials(
                                 AuthScope.ANY,
-                                new 
UsernamePasswordCredentials(username.get(), password.get()));
+                                new 
UsernamePasswordCredentials(username.get(), passwordStr));
                         
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                     }
 
@@ -289,6 +293,36 @@ public class EasysearchClient {
         }
     }
 
+    public boolean clearScroll(String scrollId) {
+        if (scrollId == null || scrollId.isEmpty()) {
+            return false;
+        }
+
+        String endpoint = "/_search/scroll";
+        Request request = new Request("DELETE", endpoint);
+        Map<String, String> param = new HashMap<>();
+        param.put("scroll_id", scrollId);
+        request.setJsonEntity(JsonUtils.toJsonString(param));
+
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                log.warn("DELETE {} response null when clearing scrollId {}", 
endpoint, scrollId);
+                return false;
+            }
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                return true;
+            } else {
+                log.warn("Failed to clear scrollId {}, status code={}", 
scrollId, statusCode);
+                return false;
+            }
+        } catch (IOException e) {
+            log.warn("Error clearing scrollId " + scrollId, e);
+            return false;
+        }
+    }
+
     /**
      * first time to request search documents by scroll call 
/${index}/_search?scroll=${scroll}
      *
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
index 22a408ea4d..29c8cfda7f 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
@@ -74,19 +74,33 @@ public class EasysearchSourceReader implements 
SourceReader<SeaTunnelRow, Easyse
             EasysearchSourceSplit split = splits.poll();
             if (split != null) {
                 SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
-                ScrollResult scrollResult =
-                        ezsClient.searchByScroll(
-                                sourceIndexInfo.getIndex(),
-                                sourceIndexInfo.getSource(),
-                                sourceIndexInfo.getQuery(),
-                                sourceIndexInfo.getScrollTime(),
-                                sourceIndexInfo.getScrollSize());
-                outputFromScrollResult(scrollResult, 
sourceIndexInfo.getSource(), output);
-                while (scrollResult.getDocs() != null && 
scrollResult.getDocs().size() > 0) {
-                    scrollResult =
-                            ezsClient.searchWithScrollId(
-                                    scrollResult.getScrollId(), 
sourceIndexInfo.getScrollTime());
+                String scrollId = null;
+                try {
+                    ScrollResult scrollResult =
+                            ezsClient.searchByScroll(
+                                    sourceIndexInfo.getIndex(),
+                                    sourceIndexInfo.getSource(),
+                                    sourceIndexInfo.getQuery(),
+                                    sourceIndexInfo.getScrollTime(),
+                                    sourceIndexInfo.getScrollSize());
+                    scrollId = scrollResult.getScrollId();
                     outputFromScrollResult(scrollResult, 
sourceIndexInfo.getSource(), output);
+                    while (scrollResult.getDocs() != null && 
scrollResult.getDocs().size() > 0) {
+                        scrollResult =
+                                ezsClient.searchWithScrollId(
+                                        scrollResult.getScrollId(),
+                                        sourceIndexInfo.getScrollTime());
+                        scrollId = scrollResult.getScrollId();
+                        outputFromScrollResult(scrollResult, 
sourceIndexInfo.getSource(), output);
+                    }
+                } finally {
+                    if (scrollId != null && !scrollId.isEmpty()) {
+                        try {
+                            ezsClient.clearScroll(scrollId);
+                        } catch (Exception e) {
+                            log.warn("Failed to clear Easysearch scrollId: " + 
scrollId, e);
+                        }
+                    }
                 }
             } else if (noMoreSplit) {
                 // signal to the source that we have reached the end of the 
data.
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
index dce6cbfffd..b02886c04a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
@@ -209,6 +209,7 @@ public class EasysearchIT extends TestSuiteBase implements 
TestResource {
         query.put("range", range);
         ScrollResult scrollResult =
                 easysearchClient.searchByScroll(indexName, source, query, 
"1m", 1000);
+        String scrollId = scrollResult.getScrollId();
         scrollResult
                 .getDocs()
                 .forEach(
@@ -231,6 +232,12 @@ public class EasysearchIT extends TestSuiteBase implements 
TestResource {
                                         o -> 
Integer.valueOf(o.get("c_int").toString())))
                         .map(JsonUtils::toJsonString)
                         .collect(Collectors.toList());
+
+        if (scrollId != null && !scrollId.isEmpty()) {
+            boolean cleared = easysearchClient.clearScroll(scrollId);
+            Assertions.assertTrue(cleared);
+        }
+
         return docs;
     }
 
@@ -344,6 +351,7 @@ public class EasysearchIT extends TestSuiteBase implements 
TestResource {
         query.put("range", range);
         ScrollResult scrollResult =
                 easysearchClient.searchByScroll("st_index2", source, query, 
"1m", 1000);
+        String scrollId = scrollResult.getScrollId();
         scrollResult
                 .getDocs()
                 .forEach(
@@ -366,6 +374,12 @@ public class EasysearchIT extends TestSuiteBase implements 
TestResource {
                                         o -> 
Integer.valueOf(o.get("c_int").toString())))
                         .map(JsonUtils::toJsonString)
                         .collect(Collectors.toList());
+
+        if (scrollId != null && !scrollId.isEmpty()) {
+            boolean cleared = easysearchClient.clearScroll(scrollId);
+            Assertions.assertTrue(cleared);
+        }
+
         return docs;
     }
 

Reply via email to