morningman closed pull request #350: Optimize the publish logic of streaming
load
URL: https://github.com/apache/incubator-doris/pull/350
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index c608de08..a2902daa 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -330,6 +330,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
+ k_streaming_load_current_processing.increment(-1);
return -1;
}
return 0;
diff --git a/be/test/olap/file_helper_test.cpp
b/be/test/olap/file_helper_test.cpp
index ed21e001..90d8e46a 100644
--- a/be/test/olap/file_helper_test.cpp
+++ b/be/test/olap/file_helper_test.cpp
@@ -91,10 +91,10 @@ TEST_F(FileHandlerTest, TestWrite) {
ASSERT_EQ(22, length);
- char* large_bytes2[(1 << 12)];
+ char* large_bytes2[(1 << 10)];
memset(large_bytes2, 0, sizeof(char)*((1 << 12)));
int i = 1;
- while (i < 1 << 20) {
+ while (i < 1 << 17) {
file_handler.write(large_bytes2, ((1 << 12)));
++i;
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/src/main/java/org/apache/doris/catalog/Replica.java
index f6046057..a51f1995 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java
@@ -20,6 +20,7 @@
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -178,6 +179,25 @@ public synchronized void updateVersionInfo(long
newVersion, long newVersionHash,
lastSuccessVersion, lastSuccessVersionHash, dataSize,
rowCount);
}
+ /* last failed version: LFV
+ * last success version: LSV
+ * version: V
+ *
+ * Case 1:
+ * If LFV > LSV, set LSV back to V, which indicates that version
between LSV and LFV is invalid.
+ * Clone task will clone the version between LSV and LFV
+ *
+ * Case 2:
+ * LFV changed, set LSV back to V. This is just same as Case 1. Cause
LFV must large than LSV.
+ *
+ * Case 3:
+ * LFV remains unchanged, just update LSV, and then check if it falls
into Case 1.
+ *
+ * Case 4:
+ * V is larger or equal to LFV, reset LFV. And if V is less than LSV,
just set V to LSV. This may
+ * happen when a clone task finished and report version V, but the
LSV is already larger than V,
+ * And we know that version between V and LSV is valid, so move V
forward to LSV.
+ */
private void updateReplicaInfo(long newVersion, long newVersionHash,
long lastFailedVersion, long lastFailedVersionHash,
long lastSuccessVersion, long lastSuccessVersionHash,
@@ -196,11 +216,14 @@ private void updateReplicaInfo(long newVersion, long
newVersionHash,
lastSuccessVersion = this.version;
lastSuccessVersionHash = this.versionHash;
}
+
+ // case 1:
if (this.lastSuccessVersion <= this.lastFailedVersion) {
this.lastSuccessVersion = this.version;
this.lastSuccessVersionHash = this.versionHash;
}
+ // TODO: this case is unknown, add log to observe
if (this.version > lastFailedVersion && lastFailedVersion > 0) {
LOG.info("current version {} is larger than last failed version {}
, "
+ "last failed version hash {}, maybe a fatal error or
be report version, print a stack here ",
@@ -209,15 +232,17 @@ private void updateReplicaInfo(long newVersion, long
newVersionHash,
if (lastFailedVersion != this.lastFailedVersion
|| this.lastFailedVersionHash != lastFailedVersionHash) {
- // if last failed version changed, then set last success version
to invalid version
+ // Case 2:
if (lastFailedVersion > this.lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
this.lastFailedVersionHash = lastFailedVersionHash;
this.lastFailedTimestamp = System.currentTimeMillis();
}
+
this.lastSuccessVersion = this.version;
this.lastSuccessVersionHash = this.versionHash;
} else {
+ // Case 3:
if (lastSuccessVersion >= this.lastSuccessVersion) {
this.lastSuccessVersion = lastSuccessVersion;
this.lastSuccessVersionHash = lastSuccessVersionHash;
@@ -228,9 +253,7 @@ private void updateReplicaInfo(long newVersion, long
newVersionHash,
}
}
- // if last failed version <= version, then last failed version is
invalid
- // version xxxx | last failed version xxxx | last success version xxx
- // if current version == last failed version and version hash != last
failed version hash, it means the version report from be is not valid
+ // Case 4:
if (this.version > this.lastFailedVersion
|| this.version == this.lastFailedVersion && this.versionHash
== this.lastFailedVersionHash
|| this.version == this.lastFailedVersion &&
this.lastFailedVersionHash == 0 && this.versionHash != 0) {
@@ -242,7 +265,7 @@ private void updateReplicaInfo(long newVersion, long
newVersionHash,
this.versionHash = this.lastSuccessVersionHash;
}
}
- // TODO yiguolei use info log here, there maybe a lot of logs, change
it to debug when concurrent load is stable
+
LOG.debug("update {}", this.toString());
}
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java
b/fe/src/main/java/org/apache/doris/common/Config.java
index 75ac8579..fa13718a 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -241,6 +241,9 @@
* if (current_time - t1) > 300s, then palo will treat C as a failure
node
* will call transaction manager to commit the transaction and tell
transaction manager
* that C is failed
+ *
+ * This is also used when waiting for publish tasks
+ *
* TODO this parameter is the default value for all job and the DBA could
specify it for separate job
*/
@ConfField public static int load_straggler_wait_second = 300;
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index 3a9b7ac4..b96d4a03 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -46,9 +46,9 @@
import org.apache.doris.task.CloneTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.CreateRollupTask;
-import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
+import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.task.SchemaChangeTask;
import org.apache.doris.task.SnapshotTask;
@@ -555,7 +555,7 @@ private void finishPublishVersion(AgentTask task,
TFinishTaskRequest request) {
if (request.isSetError_tablet_ids()) {
errorTabletIds = request.getError_tablet_ids();
}
- PublishVersionTask publishVersionTask = (PublishVersionTask)task;
+ PublishVersionTask publishVersionTask = (PublishVersionTask) task;
publishVersionTask.addErrorTablets(errorTabletIds);
publishVersionTask.setIsFinished(true);
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
index 0b98e108..111c4df8 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -17,9 +17,6 @@
package org.apache.doris.metric;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-
import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJob.JobType;
import org.apache.doris.catalog.Catalog;
@@ -33,6 +30,10 @@
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -57,6 +58,8 @@
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_IMAGE_WRITE;
public static LongCounterMetric COUNTER_IMAGE_PUSH;
+ public static LongCounterMetric COUNTER_TXN_FAILED;
+ public static LongCounterMetric COUNTER_TXN_SUCCESS;
public static Histogram HISTO_QUERY_LATENCY;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
@@ -161,6 +164,12 @@ public Long getValue() {
COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
"counter of image succeeded in pushing to other frontends");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
+ COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success",
+ "counter of success transactions");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
+ COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed",
+ "counter of failed transactions");
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED);
// 3. histogram
HISTO_QUERY_LATENCY =
METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ffce942b..9c6d2668 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -634,7 +634,7 @@ public TLoadTxnCommitResult
loadTxnCommit(TLoadTxnCommitRequest request) throws
return result;
}
- // return true if commit success and publish success, return false if
publish timout
+ // return true if commit success and publish success, return false if
publish timeout
private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws
UserException {
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
@@ -655,6 +655,7 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest
request) throws UserExce
}
throw new UserException("unknown database, database=" + dbName);
}
+
return
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
db, request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()),
diff --git a/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
b/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 97e31a6d..c65cfe05 100644
--- a/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -17,15 +17,15 @@
package org.apache.doris.task;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.doris.thrift.TPartitionVersionInfo;
+import org.apache.doris.thrift.TPublishVersionRequest;
+import org.apache.doris.thrift.TTaskType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.doris.thrift.TPartitionVersionInfo;
-import org.apache.doris.thrift.TPublishVersionRequest;
-import org.apache.doris.thrift.TTaskType;
+import java.util.ArrayList;
+import java.util.List;
public class PublishVersionTask extends AgentTask {
private static final Logger LOG =
LogManager.getLogger(PublishVersionTask.class);
diff --git
a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index d3da6a0b..35bd20c8 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -17,12 +17,6 @@
package org.apache.doris.transaction;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
import org.apache.doris.alter.RollupJob;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@@ -49,6 +43,13 @@
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -82,6 +83,7 @@
// transactionId -> TransactionState
private Map<Long, TransactionState> idToTransactionState;
+ // db id -> (label -> txn id)
private com.google.common.collect.Table<Long, String, Long>
dbIdToTxnLabels;
private Map<Long, Integer> runningTxnNums;
private TransactionIdGenerator idGenerator;
@@ -107,7 +109,7 @@ public long beginTransaction(long dbId, String label,
String coordinator, LoadJo
throws AnalysisException, LabelAlreadyExistsException,
BeginTransactionException {
if (Config.disable_load_job) {
- throw new BeginTransactionException("disable_load_job is set to
true, all load job is prevented");
+ throw new BeginTransactionException("disable_load_job is set to
true, all load jobs are prevented");
}
writeLock();
@@ -185,12 +187,11 @@ public void deleteTransaction(long transactionId) {
*/
public void commitTransaction(long dbId, long transactionId,
List<TabletCommitInfo> tabletCommitInfos)
throws MetaNotFoundException, TransactionCommitFailedException {
-
if (Config.disable_load_job) {
- throw new TransactionCommitFailedException("disable_load_job is
set to true, all load job is prevented");
+ throw new TransactionCommitFailedException("disable_load_job is
set to true, all load jobs are prevented");
}
- LOG.debug("try to commit transaction:[{}]", transactionId);
+ LOG.debug("try to commit transaction: {}", transactionId);
if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
throw new TransactionCommitFailedException("all partitions have no
load data");
}
@@ -260,7 +261,8 @@ public void commitTransaction(long dbId, long
transactionId, List<TabletCommitIn
}
// the rolling up index should also be taken care
// if the rollup index failed during load, then set its last
failed version
- // if rollup task finished, it should compare version and last
failed version, if version < last failed version, then the replica is failed
+ // if rollup task finished, it should compare version and last
failed version,
+ // if version < last failed version, then the replica is failed
if (rollingUpIndex != null) {
allIndices.add(rollingUpIndex);
}
@@ -287,11 +289,12 @@ public void commitTransaction(long dbId, long
transactionId, List<TabletCommitIn
// ignore it but not log it
// for example, a replica is in clone state
if (replica.getLastFailedVersion() < 0) {
- ++ successReplicaNum;
+ ++successReplicaNum;
} else {
// if this error replica is a base replica
and it is under rollup
// then remove the rollup task and rollup
job will remove the rollup replica automatically
- // should remove here, because the error
replicas not contains this base replica, but it have errors in the past
+ // should remove here, because the error
replicas not contains this base replica,
+ // but it has errors in the past
if (index.getId() == baseIndex.getId() &&
rollupJob != null) {
LOG.info("the base replica [{}] has
error, remove the related rollup replica from rollupjob [{}]",
replica, rollupJob);
@@ -340,12 +343,16 @@ public void commitTransaction(long dbId, long
transactionId, List<TabletCommitIn
}
// 5. persistent transactionState
unprotectUpsertTransactionState(transactionState);
+
+ // add publish version tasks. set task to null as a placeholder.
+ // tasks will be created when publishing version.
for (long backendId : totalInvolvedBackends) {
transactionState.addPublishVersionTask(backendId, null);
}
} finally {
writeUnlock();
}
+
// 6. update nextVersion because of the failure of persistent
transaction resulting in error version
updateCatalogAfterCommitted(transactionState, db);
LOG.info("transaction:[{}] successfully committed", transactionState);
@@ -385,7 +392,6 @@ public boolean commitAndPublishTransaction(Database db,
long transactionId,
}
public void abortTransaction(long transactionId, String reason) throws
UserException {
-
if (transactionId < 0) {
LOG.info("transaction id is {}, less than 0, maybe this is an old
type load job, ignore abort operation", transactionId);
return;
@@ -897,12 +903,14 @@ private void updateCatalogAfterCommitted(TransactionState
transactionState, Data
}
}
partition.setNextVersion(partition.getNextVersion() + 1);
- // the partition's current version hash should be set from
partition commit info
- // for example, fe master's partition current version hash is
123123, fe followers partition current version hash is 3333
- // they are different, fe master changed, the follower is
master now, but its current version hash is 333, if clone happened,
- // clone finished but its finished version hash != partition's
current version hash, then clone is failed
- // because clone depend on partition's current version to clone
- partition.setNextVersionHash(Util.generateVersionHash(),
partitionCommitInfo.getVersionHash());
+ // Although committed version(hash) is not visible to user,
+ // but they need to be synchronized among Frontends.
+ // because we use committed version(hash) to create clone
task, if the first Master FE
+ // send clone task with committed version hash X, and than
Master changed, the new Master FE
+ // received the clone task report with version hash X, which
not equals to it own committed
+ // version hash, than the clone task is failed.
+ partition.setNextVersionHash(Util.generateVersionHash() /*
next version hash */,
+
partitionCommitInfo.getVersionHash() /* committed version hash*/);
}
}
}
@@ -1015,14 +1023,14 @@ private void updateDBRunningTxnNum(TransactionStatus
preStatus, TransactionState
if (preStatus == null
&& (curTxnState.getTransactionStatus() ==
TransactionStatus.PREPARE
|| curTxnState.getTransactionStatus() ==
TransactionStatus.COMMITTED)) {
- ++ dbRunningTxnNum;
+ ++dbRunningTxnNum;
runningTxnNums.put(curTxnState.getDbId(), dbRunningTxnNum);
} else if (preStatus != null
&& (preStatus == TransactionStatus.PREPARE
|| preStatus == TransactionStatus.COMMITTED)
&& (curTxnState.getTransactionStatus() ==
TransactionStatus.VISIBLE
|| curTxnState.getTransactionStatus() ==
TransactionStatus.ABORTED)) {
- -- dbRunningTxnNum;
+ --dbRunningTxnNum;
if (dbRunningTxnNum < 1) {
runningTxnNums.remove(curTxnState.getDbId());
} else {
diff --git
a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 324ff9d3..270504a7 100644
--- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -17,11 +17,8 @@
package org.apache.doris.transaction;
-import com.google.common.collect.Sets;
-
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.Daemon;
@@ -31,6 +28,10 @@
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,14 +39,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class PublishVersionDaemon extends Daemon {
private static final Logger LOG =
LogManager.getLogger(PublishVersionDaemon.class);
public PublishVersionDaemon() {
- super("PUBLISH_VERSION");
- setInterval(Config.publish_version_interval_millis);
+ super("PUBLISH_VERSION", Config.publish_version_interval_millis);
}
protected void runOneCycle() {
@@ -64,7 +65,7 @@ private void publishVersion() {
}
// TODO yiguolei: could publish transaction state according to
multi-tenant cluster info
// but should do more work. for example, if a table is migrate from
one cluster to another cluster
- // should pulish to two clusters.
+ // should publish to two clusters.
// attention here, we publish transaction state to all backends
including dead backend, if not publish to dead backend
// then transaction manager will treat it as success
List<Long> allBackends =
Catalog.getCurrentSystemInfo().getBackendIds(false);
@@ -97,13 +98,14 @@ private void publishVersion() {
}
}
Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
+ // public version tasks are not persisted in catalog, so
publishBackends may be empty.
+ // so we have to try publish to all backends;
if (publishBackends.isEmpty()) {
// could not just add to it, should new a new object, or the
back map will destroyed
publishBackends = Sets.newHashSet();
- // this is useful if fe master transfer to another master,
because publish version task is not
- // persistent to edit log, then it should publish to all
backends
publishBackends.addAll(allBackends);
}
+
for (long backendId : publishBackends) {
PublishVersionTask task = new PublishVersionTask(backendId,
transactionState.getTransactionId(),
@@ -130,6 +132,7 @@ private void publishVersion() {
}
Map<Long, PublishVersionTask> transTasks =
transactionState.getPublishVersionTasks();
Set<Replica> transErrorReplicas = Sets.newHashSet();
+ List<PublishVersionTask> unfinishedTasks = Lists.newArrayList();
for (PublishVersionTask publishVersionTask : transTasks.values()) {
if (publishVersionTask.isFinished()) {
// sometimes backend finish publish version task, but it
maybe failed to change transactionid to version for some tablets
@@ -145,44 +148,48 @@ private void publishVersion() {
}
}
} else {
- // if task is not finished in time, then set all replica
in the backend to error state
- List<TPartitionVersionInfo> versionInfos =
publishVersionTask.getPartitionVersionInfos();
- Set<Long> errorPartitionIds = Sets.newHashSet();
- for (TPartitionVersionInfo versionInfo : versionInfos) {
- errorPartitionIds.add(versionInfo.getPartition_id());
- }
- if (errorPartitionIds.isEmpty()) {
- continue;
- }
- List<Long> tabletIds =
tabletInvertedIndex.getTabletIdsByBackendId(publishVersionTask.getBackendId());
- for (long tabletId : tabletIds) {
- long partitionId =
tabletInvertedIndex.getPartitionId(tabletId);
- if (errorPartitionIds.contains(partitionId)) {
- Replica replica =
tabletInvertedIndex.getReplica(tabletId, publishVersionTask.getBackendId());
- transErrorReplicas.add(replica);
+ unfinishedTasks.add(publishVersionTask);
+ }
+ }
+
+ boolean shouldFinishTxn = false;
+ if (!unfinishedTasks.isEmpty()) {
+ if (transactionState.isPublishTimeout()) {
+ // transaction's publish is timeout, but there still has
unfinished tasks.
+ // we need to collect all error replicas, and try to
finish this txn.
+ for (PublishVersionTask unfinishedTask : unfinishedTasks) {
+ // set all replica in the backend to error state
+ List<TPartitionVersionInfo> versionInfos =
unfinishedTask.getPartitionVersionInfos();
+ Set<Long> errorPartitionIds = Sets.newHashSet();
+ for (TPartitionVersionInfo versionInfo : versionInfos)
{
+
errorPartitionIds.add(versionInfo.getPartition_id());
+ }
+ if (errorPartitionIds.isEmpty()) {
+ continue;
+ }
+
+ // TODO(cmy): this is inefficient, but just keep it
simple. will change it later.
+ List<Long> tabletIds =
tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId());
+ for (long tabletId : tabletIds) {
+ long partitionId =
tabletInvertedIndex.getPartitionId(tabletId);
+ if (errorPartitionIds.contains(partitionId)) {
+ Replica replica =
tabletInvertedIndex.getReplica(tabletId,
+
unfinishedTask.getBackendId());
+ transErrorReplicas.add(replica);
+ }
}
}
+
+ shouldFinishTxn = true;
}
+ // transaction's publish is not timeout, waiting next round.
+ } else {
+ // all publish tasks are finished, try to finish this txn.
+ shouldFinishTxn = true;
}
- // the timeout value is related with backend num
- long timeoutMillis =
Math.min(Config.publish_version_timeout_second * transTasks.size() * 1000,
10000);
- // the minimal internal should be 3s
- timeoutMillis = Math.max(timeoutMillis, 3000);
- // should not wait clone replica or replica's that with last
failed version > 0
- // if wait for them, the publish process will be very slow
- int normalReplicasNotRespond = 0;
- Set<Long> allErrorReplicas = Sets.newHashSet();
- for (Replica replica : transErrorReplicas) {
- allErrorReplicas.add(replica.getId());
- if (replica.getState() != ReplicaState.CLONE
- && replica.getLastFailedVersion() < 1) {
- ++normalReplicasNotRespond;
- }
- }
- if (normalReplicasNotRespond == 0
- || System.currentTimeMillis() -
transactionState.getPublishVersionTime() > timeoutMillis) {
- LOG.debug("transTask num {}, error replica id num {}",
transTasks.size(), transErrorReplicas.size());
+ if (shouldFinishTxn) {
+ Set<Long> allErrorReplicas = transErrorReplicas.stream().map(v
-> v.getId()).collect(Collectors.toSet());
globalTransactionMgr.finishTransaction(transactionState.getTransactionId(),
allErrorReplicas);
if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish
version time, should check
@@ -192,11 +199,12 @@ private void publishVersion() {
transactionState, transErrorReplicas.size());
}
}
+
if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
}
}
- }
+ } // end for readyTransactionStates
}
}
diff --git
a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index f8763490..a5dda349 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -17,8 +17,10 @@
package org.apache.doris.transaction;
+import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.task.PublishVersionTask;
import com.google.common.collect.Maps;
@@ -253,6 +255,13 @@ public void setTransactionStatus(TransactionStatus
transactionStatus) {
this.transactionStatus = transactionStatus;
if (transactionStatus == TransactionStatus.VISIBLE) {
this.latch.countDown();
+ if (MetricRepo.isInit.get()) {
+ MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
+ }
+ } else if (transactionStatus == TransactionStatus.ABORTED) {
+ if (MetricRepo.isInit.get()) {
+ MetricRepo.COUNTER_TXN_FAILED.increase(1L);
+ }
}
}
@@ -321,4 +330,12 @@ public LoadJobSourceType getSourceType() {
public Map<Long, PublishVersionTask> getPublishVersionTasks() {
return publishVersionTasks;
}
+
+ public boolean isPublishTimeout() {
+ // timeout is between 3 to Config.max_txn_publish_waiting_time_ms
seconds.
+ long timeoutMillis = Math.min(Config.publish_version_timeout_second *
publishVersionTasks.size() * 1000,
+ Config.load_straggler_wait_second *
1000);
+ timeoutMillis = Math.max(timeoutMillis, 3000);
+ return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
+ }
}
diff --git
a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index 8f6e22ec..f5f862bb 100644
---
a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++
b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -17,32 +17,26 @@
package org.apache.doris.load.routineload;
-import com.google.common.collect.Lists;
-import mockit.Deencapsulation;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.persist.EditLog;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
-import org.easymock.EasyMock;
+
+import com.google.common.collect.Lists;
+
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import mockit.Deencapsulation;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
public class RoutineLoadSchedulerTest {
@Test
@@ -72,18 +66,6 @@ public void testNormalRunOneCycle(@Mocked Catalog catalog,
Deencapsulation.setField(routineLoadJob, "kafkaPartitions",
partitions);
Deencapsulation.setField(routineLoadJob, "desireTaskConcurrentNum", 3);
- new MockUp<Catalog>() {
- @Mock
- public SystemInfoService getCurrentSystemInfo() {
- return systemInfoService;
- }
-
- @Mock
- public Catalog getCurrentCatalog() {
- return catalog;
- }
- };
-
new Expectations() {
{
catalog.getRoutineLoadInstance();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]