Hisoka-X commented on code in PR #9869:
URL: https://github.com/apache/seatunnel/pull/9869#discussion_r2357432393
##########
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java:
##########
@@ -82,6 +89,48 @@ public List<String> zrange(final String key, final long
start, final long stop)
return jedisCluster.zrange(key, start, stop);
}
+ @Override
+ public String info() {
+ Map<String, ConnectionPool> nodes = jedisCluster.getClusterNodes();
+ if (nodes.isEmpty()) {
+ throw new RedisConnectorException(
+ GET_REDIS_INFO_ERROR, "No available nodes in cluster");
+ }
+
+ // Traverse all nodes and try to obtain the info
+ Exception lastException = null;
+ for (Map.Entry<String, ConnectionPool> entry : nodes.entrySet()) {
+ ConnectionPool pool = entry.getValue();
+ try {
+ try (Jedis jedis = new Jedis(pool.getResource())) {
Review Comment:
ditto
##########
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java:
##########
@@ -101,28 +108,28 @@ private static String getNormalKey(SeaTunnelRow element,
List<String> fields, St
}
}
- private String getCustomKey(SeaTunnelRow element, List<String> fields,
String keyField) {
- String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
- StringBuilder key = new StringBuilder();
- for (int i = 0; i < keyFieldSegments.length; i++) {
- String keyFieldSegment = keyFieldSegments[i];
- if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER)
- && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
- String realKeyField = keyFieldSegment.substring(1,
keyFieldSegment.length() - 1);
- if (fields.contains(realKeyField)) {
- Object realFieldValue =
element.getField(fields.indexOf(realKeyField));
- key.append(realFieldValue == null ? "" :
realFieldValue.toString());
- } else {
- key.append(keyFieldSegment);
- }
- } else {
- key.append(keyFieldSegment);
- }
- if (i != keyFieldSegments.length - 1) {
- key.append(REDIS_GROUP_DELIMITER);
- }
+ protected String getCustomKey(SeaTunnelRow element, List<String> fields,
String keyField) {
+ Matcher matcher = PLACEHOLDER_PATTERN.matcher(keyField);
+ StringBuffer result = new StringBuffer();
Review Comment:
Let's reuse `PlaceholderUtils`
##########
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java:
##########
@@ -155,10 +162,66 @@ public void batchWriteZset(
int size = keys.size();
for (int i = 0; i < size; i++) {
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
- RedisDataType.ZSET.del(this, keys.get(i), values.get(i));
+ RedisDataType.ZSET.del(jedis, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.ZSET.set(jedis, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+ }
+
+ /** In cluster mode, traverse and scan each node key */
+ @Override
+ public ScanResult<String> scanKeyResult(
+ final String cursor, final ScanParams params, final RedisDataType
type) {
+ // Create a composite cursor to traverse the cluster nodes: the format
is "Node Index: Node
+ // cursor"
+ int nodeIndex = 0;
+ String nodeCursor = cursor;
+ boolean isFirstScan = !cursor.contains(":");
+
+ if (!ScanParams.SCAN_POINTER_START.equals(cursor) &&
cursor.contains(":")) {
+ String[] parts = cursor.split(":", 2);
+ nodeIndex = Integer.parseInt(parts[0]);
+ nodeCursor = parts[1];
+ }
+
+ // All nodes have been scanned
+ if (nodeIndex >= nodes.size()) {
+ return new ScanResult<>(ScanParams.SCAN_POINTER_START, new
ArrayList<>());
+ }
+
+ ConnectionPool pool = nodes.get(nodeIndex).getValue();
+ List<String> resultKeys;
+ String nextCursor;
+
+ try (Jedis jedis = new Jedis(pool.getResource())) {
Review Comment:
Can we cache Jedis client to save CPU cost?
##########
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java:
##########
@@ -82,6 +89,48 @@ public List<String> zrange(final String key, final long
start, final long stop)
return jedisCluster.zrange(key, start, stop);
}
+ @Override
+ public String info() {
Review Comment:
Could you explain why this method can fix
https://github.com/apache/seatunnel/issues/9865 ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]