This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a0d6be  [Enhancement] Optimize some code, including querying label 
transaction status and others (#62)
7a0d6be is described below

commit 7a0d6be309d0fe5266e41e6f8e38b1a0266a2fd1
Author: wangchuang <59386838+chuang-wang-...@users.noreply.github.com>
AuthorDate: Wed Jan 22 10:24:30 2025 +0800

    [Enhancement] Optimize some code, including querying label transaction 
status and others (#62)
---
 .../doris/kafka/connector/DorisSinkConnector.java  |  2 +-
 .../connection/JdbcConnectionProvider.java         |  8 +++----
 .../connector/metrics/DorisConnectMonitor.java     |  2 +-
 .../connector/metrics/MetricsJmxReporter.java      |  2 +-
 .../doris/kafka/connector/service/RestService.java |  5 ++---
 .../connector/utils/BackoffAndRetryUtils.java      |  4 ++--
 .../kafka/connector/writer/LabelGenerator.java     | 26 +++++++++++++---------
 .../kafka/connector/writer/StreamLoadWriter.java   | 17 +++++---------
 .../connector/writer/commit/DorisCommitter.java    |  9 ++------
 .../kafka/connector/writer/load/CopyLoad.java      |  2 +-
 10 files changed, 35 insertions(+), 42 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
index 0bf90bd..91347e4 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
@@ -79,7 +79,7 @@ public class DorisSinkConnector extends SinkConnector {
         List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
         for (int i = 0; i < maxTasks; i++) {
             Map<String, String> conf = new HashMap<>(config);
-            conf.put(ConfigCheckUtils.TASK_ID, i + "");
+            conf.put(ConfigCheckUtils.TASK_ID, String.valueOf(i));
             taskConfigs.add(conf);
         }
         return taskConfigs;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
 
b/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
index 57a83fe..47da2d9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
 public class JdbcConnectionProvider implements ConnectionProvider, 
Serializable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcConnectionProvider.class);
-    protected final String driverName = "com.mysql.jdbc.Driver";
-    protected final String cjDriverName = "com.mysql.cj.jdbc.Driver";
+    protected static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+    protected static final String CJ_DRIVER_NAME = "com.mysql.cj.jdbc.Driver";
     private static final String JDBC_URL_TEMPLATE = "jdbc:mysql://%s";
 
     private static final long serialVersionUID = 1L;
@@ -51,11 +51,11 @@ public class JdbcConnectionProvider implements 
ConnectionProvider, Serializable
             return connection;
         }
         try {
-            Class.forName(cjDriverName);
+            Class.forName(CJ_DRIVER_NAME);
         } catch (ClassNotFoundException ex) {
             LOG.warn(
                     "can not found class com.mysql.cj.jdbc.Driver, use class 
com.mysql.jdbc.Driver");
-            Class.forName(driverName);
+            Class.forName(DRIVER_NAME);
         }
         String jdbcUrl = String.format(JDBC_URL_TEMPLATE, 
options.getQueryUrl());
         if (!Objects.isNull(options.getUser())) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
 
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
index c3b28a2..0fe7d00 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
@@ -81,7 +81,7 @@ public class DorisConnectMonitor {
         // partition reassignment
         LOG.debug(
                 "Registering metrics existing:{}",
-                
metricsJmxReporter.getMetricRegistry().getMetrics().keySet().toString());
+                metricsJmxReporter.getMetricRegistry().getMetrics().keySet());
         metricsJmxReporter.removeMetricsFromRegistry(String.valueOf(taskId));
 
         try {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
 
b/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
index dc9da31..61fe552 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
@@ -89,7 +89,7 @@ public class MetricsJmxReporter {
                     "Metric registry:{}, size is:{}, names:{}",
                     prefixFilter,
                     metricRegistry.getMetrics().size(),
-                    metricRegistry.getMetrics().keySet().toString());
+                    metricRegistry.getMetrics().keySet());
         }
     }
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java 
b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
index 7662aaa..45942f4 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
@@ -85,8 +85,7 @@ public class RestService {
                 HttpGet httpGet = new HttpGet(beUrl);
                 String response = send(options, httpGet, logger);
                 logger.info("Backend Info:{}", response);
-                List<BackendV2.BackendRowV2> backends = 
parseBackendV2(response, logger);
-                return backends;
+                return parseBackendV2(response, logger);
             } catch (ConnectedFailedException e) {
                 logger.info(
                         "Doris FE node {} is unavailable: {}, Request the next 
Doris FE node",
@@ -313,7 +312,7 @@ public class RestService {
     /** Get table schema from doris. */
     public static Schema getSchema(
             DorisOptions dorisOptions, String db, String table, Logger logger) 
{
-        logger.trace("start get " + db + "." + table + " schema from doris.");
+        logger.trace("start get {}.{} schema from doris.", db, table);
         Object responseData = null;
         try {
             String tableSchemaUri =
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
 
b/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
index fe51b52..035b70e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
@@ -32,7 +32,7 @@ public class BackoffAndRetryUtils {
     private static final int[] backoffSec = {0, 1, 2, 4};
 
     /** Interfaces to define the lambda function to be used by backoffAndRetry 
*/
-    public interface backoffFunction {
+    public interface BackoffFunction {
         Object apply() throws Exception;
     }
 
@@ -45,7 +45,7 @@ public class BackoffAndRetryUtils {
      * @throws Exception if the runnable function throws exception
      */
     public static Object backoffAndRetry(
-            final LoadOperation operation, final backoffFunction runnable) 
throws Exception {
+            final LoadOperation operation, final BackoffFunction runnable) 
throws Exception {
         for (final int iteration : backoffSec) {
             if (iteration != 0) {
                 Thread.sleep(iteration * 1000L);
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
index daca8b1..0d25c6f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
@@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicLong;
 
 /** Generator label for stream load. */
 public class LabelGenerator {
-    private String topic;
-    private int partition;
-    private String tableIdentifier;
+    private final String topic;
+    private final int partition;
+    private final String tableIdentifier;
     // The label of doris stream load cannot be repeated when loading.
     // Under special circumstances (usually load failure) when 
doris-kafka-connector is started,
     // stream load is performed at the same offset every time, which will 
cause label duplication.
@@ -35,8 +35,8 @@ public class LabelGenerator {
 
     public LabelGenerator(String topic, int partition, String tableIdentifier) 
{
         // The label of stream load can not contain `.`
-        this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_");
-        this.topic = topic.replaceAll("\\.", "_");
+        this.tableIdentifier = tableIdentifier.replace(".", "_");
+        this.topic = topic.replace(".", "_");
         this.partition = partition;
         Random random = new Random();
         labelRandomSuffix = new AtomicLong(random.nextInt(1000));
@@ -44,15 +44,21 @@ public class LabelGenerator {
 
     public String generateLabel(long lastOffset) {
         StringBuilder sb = new StringBuilder();
-        sb.append(topic)
-                .append(LoadConstants.FILE_DELIM_DEFAULT)
-                .append(partition)
-                .append(LoadConstants.FILE_DELIM_DEFAULT)
-                .append(tableIdentifier)
+        sb.append(this.buildLabelPrefix())
                 .append(LoadConstants.FILE_DELIM_DEFAULT)
                 .append(lastOffset)
                 .append(LoadConstants.FILE_DELIM_DEFAULT)
                 .append(labelRandomSuffix.getAndIncrement());
         return sb.toString();
     }
+
+    public String buildLabelPrefix() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(topic)
+                .append(LoadConstants.FILE_DELIM_DEFAULT)
+                .append(partition)
+                .append(LoadConstants.FILE_DELIM_DEFAULT)
+                .append(tableIdentifier);
+        return sb.toString();
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index 0144a7f..67d63e3 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -49,7 +49,7 @@ public class StreamLoadWriter extends DorisWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamLoadWriter.class);
     private static final String TRANSACTION_LABEL_PATTEN =
-            "SHOW TRANSACTION FROM %s WHERE LABEL LIKE '";
+            "SHOW TRANSACTION FROM %s WHERE LABEL LIKE '%s%%'";
     private List<DorisCommittable> committableList = new LinkedList<>();
     private final LabelGenerator labelGenerator;
     private final DorisCommitter dorisCommitter;
@@ -110,18 +110,11 @@ public class StreamLoadWriter extends DorisWriter {
      */
     @VisibleForTesting
     public Map<String, String> fetchLabel2Status() {
-        String queryPatten = String.format(TRANSACTION_LABEL_PATTEN, 
dorisOptions.getDatabase());
-        String tmpTableIdentifier = tableIdentifier.replaceAll("\\.", "_");
-        String tmpTopic = topic.replaceAll("\\.", "_");
         String querySQL =
-                queryPatten
-                        + tmpTopic
-                        + LoadConstants.FILE_DELIM_DEFAULT
-                        + partition
-                        + LoadConstants.FILE_DELIM_DEFAULT
-                        + tmpTableIdentifier
-                        + LoadConstants.FILE_DELIM_DEFAULT
-                        + "%'";
+                String.format(
+                        TRANSACTION_LABEL_PATTEN,
+                        dorisOptions.getDatabase(),
+                        labelGenerator.buildLabelPrefix());
         LOG.info("query doris offset by sql: {}", querySQL);
         Map<String, String> label2Status = new HashMap<>();
         try (Connection connection = 
connectionProvider.getOrEstablishConnection();
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
index 46d9a65..d9c0a7e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
@@ -21,7 +21,6 @@ package org.apache.doris.kafka.connector.writer.commit;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,15 +60,11 @@ public class DorisCommitter {
             return;
         }
         for (DorisCommittable dorisCommittable : dorisCommittables) {
-            try {
-                commitTransaction(dorisCommittable);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
+            commitTransaction(dorisCommittable);
         }
     }
 
-    private void commitTransaction(DorisCommittable committable) throws 
IOException {
+    private void commitTransaction(DorisCommittable committable) {
         // basic params
         HttpPutBuilder builder =
                 new HttpPutBuilder()
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
index a0e4596..a1228df 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
@@ -134,7 +134,7 @@ public class CopyLoad extends DataLoad {
                                     return true;
                                 }
                             }
-                            LOG.error("commit failed, cause by: " + 
loadResult);
+                            LOG.error("commit failed, cause by: {}", 
loadResult);
                             throw new CopyLoadException("commit failed, cause 
by: " + loadResult);
                         }
                     });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to