This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4bb08099d91 HIVE-29188: [hiveACIDReplication] Add src and tgt commit
time in replication metrics for better monitoring (#6071)
4bb08099d91 is described below
commit 4bb08099d91acbefee73a449a36abb1ecd2b5925
Author: Shivam Kumar <[email protected]>
AuthorDate: Tue Sep 16 08:15:05 2025 +0530
HIVE-29188: [hiveACIDReplication] Add src and tgt commit time in
replication metrics for better monitoring (#6071)
* Details:
* Currently when the Hive ACID replication is running we do not have any
idea of how the replication is progressing.
* Even when the replication is not running, we do not have any idea how
much the target is behind the src.
* This commit is to add this info in replication_metrics.
Co-authored-by: shivam02 <[email protected]>
---
.../ql/parse/repl/load/message/AbortTxnHandler.java | 14 ++++++++++++++
.../ql/parse/repl/load/message/CommitTxnHandler.java | 14 ++++++++++++++
.../parse/repl/metric/ReplicationMetricCollector.java | 16 ++++++++++++++++
.../hadoop/hive/ql/parse/repl/metric/event/Stage.java | 19 +++++++++++++++++++
.../llap/replication_metrics_ingest.q.out | 4 ++--
5 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index c92ef253de8..6cd3965757a 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse.repl.load.message;
import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -27,6 +28,8 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* AbortTxnHandler
@@ -43,6 +46,17 @@ public List<Task<?>> handle(Context context)
AbortTxnMessage msg =
deserializer.getAbortTxnMessage(context.dmd.getPayload());
+ // Saving the timestamp of all write abort txn in metric 'progress' to
calculate lag between src and tgt
+ List<Long> writeIds = msg.getWriteIds();
+ List<String> databases = Optional.ofNullable(msg.getDbsUpdated())
+ .orElse(Collections.emptyList())
+ .stream()
+ .map(StringUtils::normalizeIdentifier)
+ .toList();
+ if (databases.contains(context.dbName) && writeIds != null &&
!writeIds.isEmpty()) {
+ context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp());
+ }
+
Task<ReplTxnWork> abortTxnTask = TaskFactory.get(
new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName),
context.dbName, null,
msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN,
context.eventOnlyReplicationSpec(),
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 2224793a059..65329a8f0d3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -33,6 +34,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* CommitTxnHandler
@@ -54,6 +57,17 @@ public List<Task<?>> handle(Context context)
String tableNamePrev = null;
String tblName = null;
+ // Saving the timestamp of all write commit txn in metric 'progress' to
calculate lag between src and tgt
+ List<Long> writeIds = msg.getWriteIds();
+ List<String> databases = Optional.ofNullable(msg.getDatabases())
+ .orElse(Collections.emptyList())
+ .stream()
+ .map(StringUtils::normalizeIdentifier)
+ .toList();
+ if (databases.contains(dbName) && writeIds != null && !writeIds.isEmpty())
{
+ context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp());
+ }
+
ReplTxnWork work = new
ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName,
null, msg.getTxnId(),
ReplTxnWork.OperationType.REPL_COMMIT_TXN,
context.eventOnlyReplicationSpec(),
context.getDumpDirectory(),
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index 87990dbaafc..d6601aabf26 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -136,6 +136,22 @@ public void reportFailoverStart(String stageName,
Map<String, Long> metricMap,
}
}
+ public void setSrcTimeInProgress(long endTimeOnSrc) throws SemanticException
{
+ if (isEnabled) {
+ LOG.debug("Updating last commit time on src in progress as: {}",
endTimeOnSrc);
+ Progress progress = replicationMetric.getProgress();
+ Stage stage = progress.getStageByName("REPL_LOAD");
+ if (stage == null) {
+ return;
+ }
+ stage.setEndTimeOnSrc(endTimeOnSrc);
+ stage.setEndTimeOnTgt(getCurrentTimeInMillis());
+ progress.addStage(stage);
+ replicationMetric.setProgress(progress);
+ metricCollector.addMetric(replicationMetric);
+ }
+ }
+
public void reportStageEnd(String stageName, Status status, long lastReplId,
SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker
replStatsTracker) throws SemanticException {
unRegisterMBeanSafe();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
index 83df9f06eca..09bd15e8a04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
@@ -32,6 +32,8 @@ public class Stage {
private Status status;
private long startTime;
private long endTime;
+ private long endTimeOnSrc;
+ private long endTimeOnTgt;
private Map<String, Metric> metrics = new HashMap<>();
private String errorLogPath;
private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new
SnapshotUtils.ReplSnapshotCount();
@@ -58,6 +60,8 @@ public Stage(Stage stage) {
this.errorLogPath = stage.errorLogPath;
this.replSnapshotCount = stage.replSnapshotCount;
this.replStats = stage.replStats;
+ this.endTimeOnSrc = stage.endTimeOnSrc;
+ this.endTimeOnTgt = stage.endTimeOnTgt;
}
public String getName() {
@@ -92,6 +96,21 @@ public void setEndTime(long endTime) {
this.endTime = endTime;
}
+ public long getEndTimeOnSrc() {
+ return endTimeOnSrc;
+ }
+
+ public void setEndTimeOnSrc(long endTimeOnSrc) {
+ this.endTimeOnSrc = endTimeOnSrc;
+ }
+
+ public long getEndTimeOnTgt() {
+ return endTimeOnTgt;
+ }
+
+ public void setEndTimeOnTgt(long endTimeOnTgt) {
+ this.endTimeOnTgt = endTimeOnTgt;
+ }
public void addMetric(Metric metric) {
this.metrics.put(metric.getName(), metric);
diff --git
a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index 6fd00f4471a..ddf92089b7d 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: sys@replication_metrics
POSTHOOK: Input: sys@replication_metrics_orig
#### A masked pattern was here ####
-repl1 1
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA
{"status":"SUCCESS","stages":[{"name":"REPL [...]
-repl2 1
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA=
{"status [...]
+repl1 1
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA=
{"status":"SUCCESS","stages":[{ [...]
+repl2 1
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPA
[...]