This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7a0d6be [Enhancement] Optimize some code, including querying label transaction status and others (#62) 7a0d6be is described below commit 7a0d6be309d0fe5266e41e6f8e38b1a0266a2fd1 Author: wangchuang <59386838+chuang-wang-...@users.noreply.github.com> AuthorDate: Wed Jan 22 10:24:30 2025 +0800 [Enhancement] Optimize some code, including querying label transaction status and others (#62) --- .../doris/kafka/connector/DorisSinkConnector.java | 2 +- .../connection/JdbcConnectionProvider.java | 8 +++---- .../connector/metrics/DorisConnectMonitor.java | 2 +- .../connector/metrics/MetricsJmxReporter.java | 2 +- .../doris/kafka/connector/service/RestService.java | 5 ++--- .../connector/utils/BackoffAndRetryUtils.java | 4 ++-- .../kafka/connector/writer/LabelGenerator.java | 26 +++++++++++++--------- .../kafka/connector/writer/StreamLoadWriter.java | 17 +++++--------- .../connector/writer/commit/DorisCommitter.java | 9 ++------ .../kafka/connector/writer/load/CopyLoad.java | 2 +- 10 files changed, 35 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java index 0bf90bd..91347e4 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java @@ -79,7 +79,7 @@ public class DorisSinkConnector extends SinkConnector { List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks); for (int i = 0; i < maxTasks; i++) { Map<String, String> conf = new HashMap<>(config); - conf.put(ConfigCheckUtils.TASK_ID, i + ""); + conf.put(ConfigCheckUtils.TASK_ID, String.valueOf(i)); taskConfigs.add(conf); } return taskConfigs; diff --git a/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java b/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java index 57a83fe..47da2d9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java +++ b/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java @@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory; public class JdbcConnectionProvider implements ConnectionProvider, Serializable { private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionProvider.class); - protected final String driverName = "com.mysql.jdbc.Driver"; - protected final String cjDriverName = "com.mysql.cj.jdbc.Driver"; + protected static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; + protected static final String CJ_DRIVER_NAME = "com.mysql.cj.jdbc.Driver"; private static final String JDBC_URL_TEMPLATE = "jdbc:mysql://%s"; private static final long serialVersionUID = 1L; @@ -51,11 +51,11 @@ public class JdbcConnectionProvider implements ConnectionProvider, Serializable return connection; } try { - Class.forName(cjDriverName); + Class.forName(CJ_DRIVER_NAME); } catch (ClassNotFoundException ex) { LOG.warn( "can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver"); - Class.forName(driverName); + Class.forName(DRIVER_NAME); } String jdbcUrl = String.format(JDBC_URL_TEMPLATE, options.getQueryUrl()); if (!Objects.isNull(options.getUser())) { diff --git a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java index c3b28a2..0fe7d00 100644 --- a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java +++ b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java @@ -81,7 +81,7 @@ public class DorisConnectMonitor { // partition reassignment LOG.debug( "Registering metrics existing:{}", - metricsJmxReporter.getMetricRegistry().getMetrics().keySet().toString()); + metricsJmxReporter.getMetricRegistry().getMetrics().keySet()); metricsJmxReporter.removeMetricsFromRegistry(String.valueOf(taskId)); try { diff --git a/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java b/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java index dc9da31..61fe552 100644 --- a/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java +++ b/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java @@ -89,7 +89,7 @@ public class MetricsJmxReporter { "Metric registry:{}, size is:{}, names:{}", prefixFilter, metricRegistry.getMetrics().size(), - metricRegistry.getMetrics().keySet().toString()); + metricRegistry.getMetrics().keySet()); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java index 7662aaa..45942f4 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java @@ -85,8 +85,7 @@ public class RestService { HttpGet httpGet = new HttpGet(beUrl); String response = send(options, httpGet, logger); logger.info("Backend Info:{}", response); - List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); - return backends; + return parseBackendV2(response, logger); } catch (ConnectedFailedException e) { logger.info( "Doris FE node {} is unavailable: {}, Request the next Doris FE node", @@ -313,7 +312,7 @@ public class RestService { /** Get table schema from doris. */ public static Schema getSchema( DorisOptions dorisOptions, String db, String table, Logger logger) { - logger.trace("start get " + db + "." + table + " schema from doris."); + logger.trace("start get {}.{} schema from doris.", db, table); Object responseData = null; try { String tableSchemaUri = diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java index fe51b52..035b70e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java @@ -32,7 +32,7 @@ public class BackoffAndRetryUtils { private static final int[] backoffSec = {0, 1, 2, 4}; /** Interfaces to define the lambda function to be used by backoffAndRetry */ - public interface backoffFunction { + public interface BackoffFunction { Object apply() throws Exception; } @@ -45,7 +45,7 @@ public class BackoffAndRetryUtils { * @throws Exception if the runnable function throws exception */ public static Object backoffAndRetry( - final LoadOperation operation, final backoffFunction runnable) throws Exception { + final LoadOperation operation, final BackoffFunction runnable) throws Exception { for (final int iteration : backoffSec) { if (iteration != 0) { Thread.sleep(iteration * 1000L); diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java index daca8b1..0d25c6f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java @@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicLong; /** Generator label for stream load. */ public class LabelGenerator { - private String topic; - private int partition; - private String tableIdentifier; + private final String topic; + private final int partition; + private final String tableIdentifier; // The label of doris stream load cannot be repeated when loading. // Under special circumstances (usually load failure) when doris-kafka-connector is started, // stream load is performed at the same offset every time, which will cause label duplication. @@ -35,8 +35,8 @@ public class LabelGenerator { public LabelGenerator(String topic, int partition, String tableIdentifier) { // The label of stream load can not contain `.` - this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_"); - this.topic = topic.replaceAll("\\.", "_"); + this.tableIdentifier = tableIdentifier.replace(".", "_"); + this.topic = topic.replace(".", "_"); this.partition = partition; Random random = new Random(); labelRandomSuffix = new AtomicLong(random.nextInt(1000)); @@ -44,15 +44,21 @@ public class LabelGenerator { public String generateLabel(long lastOffset) { StringBuilder sb = new StringBuilder(); - sb.append(topic) - .append(LoadConstants.FILE_DELIM_DEFAULT) - .append(partition) - .append(LoadConstants.FILE_DELIM_DEFAULT) - .append(tableIdentifier) + sb.append(this.buildLabelPrefix()) .append(LoadConstants.FILE_DELIM_DEFAULT) .append(lastOffset) .append(LoadConstants.FILE_DELIM_DEFAULT) .append(labelRandomSuffix.getAndIncrement()); return sb.toString(); } + + public String buildLabelPrefix() { + StringBuilder sb = new StringBuilder(); + sb.append(topic) + .append(LoadConstants.FILE_DELIM_DEFAULT) + .append(partition) + .append(LoadConstants.FILE_DELIM_DEFAULT) + .append(tableIdentifier); + return sb.toString(); + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java index 0144a7f..67d63e3 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java @@ -49,7 +49,7 @@ public class StreamLoadWriter extends DorisWriter { private static final Logger LOG = LoggerFactory.getLogger(StreamLoadWriter.class); private static final String TRANSACTION_LABEL_PATTEN = - "SHOW TRANSACTION FROM %s WHERE LABEL LIKE '"; + "SHOW TRANSACTION FROM %s WHERE LABEL LIKE '%s%%'"; private List<DorisCommittable> committableList = new LinkedList<>(); private final LabelGenerator labelGenerator; private final DorisCommitter dorisCommitter; @@ -110,18 +110,11 @@ public class StreamLoadWriter extends DorisWriter { */ @VisibleForTesting public Map<String, String> fetchLabel2Status() { - String queryPatten = String.format(TRANSACTION_LABEL_PATTEN, dorisOptions.getDatabase()); - String tmpTableIdentifier = tableIdentifier.replaceAll("\\.", "_"); - String tmpTopic = topic.replaceAll("\\.", "_"); String querySQL = - queryPatten - + tmpTopic - + LoadConstants.FILE_DELIM_DEFAULT - + partition - + LoadConstants.FILE_DELIM_DEFAULT - + tmpTableIdentifier - + LoadConstants.FILE_DELIM_DEFAULT - + "%'"; + String.format( + TRANSACTION_LABEL_PATTEN, + dorisOptions.getDatabase(), + labelGenerator.buildLabelPrefix()); LOG.info("query doris offset by sql: {}", querySQL); Map<String, String> label2Status = new HashMap<>(); try (Connection connection = connectionProvider.getOrEstablishConnection(); diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java index 46d9a65..d9c0a7e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java @@ -21,7 +21,6 @@ package org.apache.doris.kafka.connector.writer.commit; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,15 +60,11 @@ public class DorisCommitter { return; } for (DorisCommittable dorisCommittable : dorisCommittables) { - try { - commitTransaction(dorisCommittable); - } catch (IOException e) { - throw new RuntimeException(e); - } + commitTransaction(dorisCommittable); } } - private void commitTransaction(DorisCommittable committable) throws IOException { + private void commitTransaction(DorisCommittable committable) { // basic params HttpPutBuilder builder = new HttpPutBuilder() diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java index a0e4596..a1228df 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java @@ -134,7 +134,7 @@ public class CopyLoad extends DataLoad { return true; } } - LOG.error("commit failed, cause by: " + loadResult); + LOG.error("commit failed, cause by: {}", loadResult); throw new CopyLoadException("commit failed, cause by: " + loadResult); } }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org