This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aeed71cebf [Fix][Connector-v2][Easysearch] Handle optional password in
credentials and add clearScroll method (#10161)
aeed71cebf is described below
commit aeed71cebf227825507b912e78aa64b72e510bc7
Author: Jast <[email protected]>
AuthorDate: Wed Dec 17 20:15:13 2025 +0800
[Fix][Connector-v2][Easysearch] Handle optional password in credentials and
add clearScroll method (#10161)
---
.../easysearch/client/EasysearchClient.java | 36 +++++++++++++++++++-
.../easysearch/source/EasysearchSourceReader.java | 38 +++++++++++++++-------
.../e2e/connector/easysearch/EasysearchIT.java | 14 ++++++++
3 files changed, 75 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
index df3153603e..e6f9ca68d5 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
@@ -158,10 +158,14 @@ public class EasysearchClient {
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> {
if (username.isPresent()) {
+ String passwordStr = null;
+ if (password.isPresent()) {
+ passwordStr = password.get();
+ }
CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
- new
UsernamePasswordCredentials(username.get(), password.get()));
+ new
UsernamePasswordCredentials(username.get(), passwordStr));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
@@ -289,6 +293,36 @@ public class EasysearchClient {
}
}
+ public boolean clearScroll(String scrollId) {
+ if (scrollId == null || scrollId.isEmpty()) {
+ return false;
+ }
+
+ String endpoint = "/_search/scroll";
+ Request request = new Request("DELETE", endpoint);
+ Map<String, String> param = new HashMap<>();
+ param.put("scroll_id", scrollId);
+ request.setJsonEntity(JsonUtils.toJsonString(param));
+
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ log.warn("DELETE {} response null when clearing scrollId {}",
endpoint, scrollId);
+ return false;
+ }
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == HttpStatus.SC_OK) {
+ return true;
+ } else {
+ log.warn("Failed to clear scrollId {}, status code={}",
scrollId, statusCode);
+ return false;
+ }
+ } catch (IOException e) {
+ log.warn("Error clearing scrollId " + scrollId, e);
+ return false;
+ }
+ }
+
/**
* first time to request search documents by scroll call
/${index}/_search?scroll=${scroll}
*
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
index 22a408ea4d..29c8cfda7f 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java
@@ -74,19 +74,33 @@ public class EasysearchSourceReader implements
SourceReader<SeaTunnelRow, Easyse
EasysearchSourceSplit split = splits.poll();
if (split != null) {
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
- ScrollResult scrollResult =
- ezsClient.searchByScroll(
- sourceIndexInfo.getIndex(),
- sourceIndexInfo.getSource(),
- sourceIndexInfo.getQuery(),
- sourceIndexInfo.getScrollTime(),
- sourceIndexInfo.getScrollSize());
- outputFromScrollResult(scrollResult,
sourceIndexInfo.getSource(), output);
- while (scrollResult.getDocs() != null &&
scrollResult.getDocs().size() > 0) {
- scrollResult =
- ezsClient.searchWithScrollId(
- scrollResult.getScrollId(),
sourceIndexInfo.getScrollTime());
+ String scrollId = null;
+ try {
+ ScrollResult scrollResult =
+ ezsClient.searchByScroll(
+ sourceIndexInfo.getIndex(),
+ sourceIndexInfo.getSource(),
+ sourceIndexInfo.getQuery(),
+ sourceIndexInfo.getScrollTime(),
+ sourceIndexInfo.getScrollSize());
+ scrollId = scrollResult.getScrollId();
outputFromScrollResult(scrollResult,
sourceIndexInfo.getSource(), output);
+ while (scrollResult.getDocs() != null &&
scrollResult.getDocs().size() > 0) {
+ scrollResult =
+ ezsClient.searchWithScrollId(
+ scrollResult.getScrollId(),
+ sourceIndexInfo.getScrollTime());
+ scrollId = scrollResult.getScrollId();
+ outputFromScrollResult(scrollResult,
sourceIndexInfo.getSource(), output);
+ }
+ } finally {
+ if (scrollId != null && !scrollId.isEmpty()) {
+ try {
+ ezsClient.clearScroll(scrollId);
+ } catch (Exception e) {
+ log.warn("Failed to clear Easysearch scrollId: " +
scrollId, e);
+ }
+ }
}
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the
data.
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
index dce6cbfffd..b02886c04a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
@@ -209,6 +209,7 @@ public class EasysearchIT extends TestSuiteBase implements
TestResource {
query.put("range", range);
ScrollResult scrollResult =
easysearchClient.searchByScroll(indexName, source, query,
"1m", 1000);
+ String scrollId = scrollResult.getScrollId();
scrollResult
.getDocs()
.forEach(
@@ -231,6 +232,12 @@ public class EasysearchIT extends TestSuiteBase implements
TestResource {
o ->
Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());
+
+ if (scrollId != null && !scrollId.isEmpty()) {
+ boolean cleared = easysearchClient.clearScroll(scrollId);
+ Assertions.assertTrue(cleared);
+ }
+
return docs;
}
@@ -344,6 +351,7 @@ public class EasysearchIT extends TestSuiteBase implements
TestResource {
query.put("range", range);
ScrollResult scrollResult =
easysearchClient.searchByScroll("st_index2", source, query,
"1m", 1000);
+ String scrollId = scrollResult.getScrollId();
scrollResult
.getDocs()
.forEach(
@@ -366,6 +374,12 @@ public class EasysearchIT extends TestSuiteBase implements
TestResource {
o ->
Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());
+
+ if (scrollId != null && !scrollId.isEmpty()) {
+ boolean cleared = easysearchClient.clearScroll(scrollId);
+ Assertions.assertTrue(cleared);
+ }
+
return docs;
}