This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 05060cb4cc4 [IOTDB-5935] Pipe: RecoverProgressIndex and
HybridProgressIndex (#9975)
05060cb4cc4 is described below
commit 05060cb4cc42337b0fb8b397c547d52e97e48194
Author: yschengzi <[email protected]>
AuthorDate: Tue May 30 22:16:25 2023 +0800
[IOTDB-5935] Pipe: RecoverProgressIndex and HybridProgressIndex (#9975)
* Implement of RecoverProgressIndex, used for recovery of StorageEngine
* Implement of HybridProgressIndex, used for hybrid progress index update
and compare.
* Update progress index for recovering TsFileResource.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../consensus/iot/IoTConsensusServerImpl.java | 4 +-
.../commons/consensus/index/ProgressIndex.java | 45 +++++
.../commons/consensus/index/ProgressIndexType.java | 18 +-
.../consensus/index/impl/HybridProgressIndex.java | 221 +++++++++++++++++++++
.../consensus/index/impl/IoTProgressIndex.java | 144 +++++++++-----
.../consensus/index/impl/MinimumProgressIndex.java | 9 +-
.../consensus/index/impl/RecoverProgressIndex.java | 207 +++++++++++++++++++
.../consensus/index/impl/SimpleProgressIndex.java | 145 +++++++++-----
.../IoTConsensusDataRegionStateMachine.java | 5 +-
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 4 +
.../SimpleConsensusProgressIndexAssigner.java | 17 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 2 +-
.../file/UnsealedTsFileRecoverPerformer.java | 4 +
.../TsFileResourceProgressIndexTest.java | 6 +
14 files changed, 705 insertions(+), 126 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index ba9d2709ad2..62c498f4b18 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -649,8 +649,8 @@ public class IoTConsensusServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
if (request instanceof ComparableConsensusRequest) {
- final IoTProgressIndex iotProgressIndex = new IoTProgressIndex();
- iotProgressIndex.addSearchIndex(thisNode.getNodeId(), searchIndex.get()
+ 1);
+ final IoTProgressIndex iotProgressIndex =
+ new IoTProgressIndex(thisNode.getNodeId(), searchIndex.get() + 1);
((ComparableConsensusRequest)
request).setProgressIndex(iotProgressIndex);
}
return new IndexedConsensusRequest(searchIndex.get() + 1,
Collections.singletonList(request));
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index b4a1f6aeca7..b6929a2fb91 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.commons.consensus.index;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -76,4 +79,46 @@ public interface ProgressIndex {
* @return the minimum progress index after the given progress index and
this progress index
*/
ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex);
+
+ /** @return the type of this progress index */
+ ProgressIndexType getType();
+
+ /**
+ * blend two progress index together, the result progress index should
satisfy:
+ *
+ * <p>(result.equals(progressIndex1) || result.isAfter(progressIndex1)) is
true
+ *
+ * <p>(result.equals(progressIndex2) || result.isAfter(progressIndex2)) is
true
+ *
+ * <p>There is no R, such that R satisfies the above conditions and
result.isAfter(R) is true
+ *
+ * @param progressIndex1 the first progress index. if it is null, the result
progress index should
+ * be the second progress index. if it is a minimum progress index, the
result progress index
+ * should be the second progress index. (if the second progress index is
null, the result
+ * should be a minimum progress index). if it is a hybrid progress
index, the result progress
+ * index should be the minimum progress index after the second progress
index and the first
+ * progress index
+ * @param progressIndex2 the second progress index. if it is null, the
result progress index
+ * should be the first progress index. if it is a minimum progress
index, the result progress
+ * index should be the first progress index. (if the first progress
index is null, the result
+ * should be a minimum progress index). if it is a hybrid progress
index, the result progress
+ * index should be the minimum progress index after the first progress
index and the second
+ * progress index
+ * @return the minimum progress index after the first progress index and the
second progress index
+ */
+ static ProgressIndex blendProgressIndex(
+ ProgressIndex progressIndex1, ProgressIndex progressIndex2) {
+ if (progressIndex1 == null && progressIndex2 == null) {
+ return new MinimumProgressIndex();
+ }
+ if (progressIndex1 == null || progressIndex1 instanceof
MinimumProgressIndex) {
+ return progressIndex2 == null ? new MinimumProgressIndex() :
progressIndex2;
+ }
+ if (progressIndex2 == null || progressIndex2 instanceof
MinimumProgressIndex) {
+ return progressIndex1; // progressIndex1 is not null
+ }
+
+ return new HybridProgressIndex(progressIndex1.getType().getType(),
progressIndex1)
+ .updateToMinimumIsAfterProgressIndex(progressIndex2);
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index 02afa4045df..615ce1336d9 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.commons.consensus.index;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -30,9 +32,11 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
public enum ProgressIndexType {
- MINIMUM_CONSENSUS_INDEX((short) 1),
- IOT_CONSENSUS_INDEX((short) 2),
- SIMPLE_CONSENSUS_INDEX((short) 3),
+ MINIMUM_PROGRESS_INDEX((short) 1),
+ IOT_PROGRESS_INDEX((short) 2),
+ SIMPLE_PROGRESS_INDEX((short) 3),
+ RECOVER_PROGRESS_INDEX((short) 4),
+ HYBRID_PROGRESS_INDEX((short) 5),
;
private final short type;
@@ -62,6 +66,10 @@ public enum ProgressIndexType {
return IoTProgressIndex.deserializeFrom(byteBuffer);
case 3:
return SimpleProgressIndex.deserializeFrom(byteBuffer);
+ case 4:
+ return RecoverProgressIndex.deserializeFrom(byteBuffer);
+ case 5:
+ return HybridProgressIndex.deserializeFrom(byteBuffer);
default:
throw new UnsupportedOperationException(
String.format("Unsupported progress index type %s.", indexType));
@@ -77,6 +85,10 @@ public enum ProgressIndexType {
return IoTProgressIndex.deserializeFrom(stream);
case 3:
return SimpleProgressIndex.deserializeFrom(stream);
+ case 4:
+ return RecoverProgressIndex.deserializeFrom(stream);
+ case 5:
+ return HybridProgressIndex.deserializeFrom(stream);
default:
throw new UnsupportedOperationException(
String.format("Unsupported progress index type %s.", indexType));
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
new file mode 100644
index 00000000000..dcd701cfbe6
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class HybridProgressIndex implements ProgressIndex {
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final Map<Short, ProgressIndex> type2Index;
+
+ public HybridProgressIndex() {
+ this.type2Index = new HashMap<>();
+ }
+
+ public HybridProgressIndex(short type, ProgressIndex progressIndex) {
+ this.type2Index = new HashMap<>();
+ type2Index.put(type, progressIndex);
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(type2Index.size(), byteBuffer);
+ for (final Map.Entry<Short, ProgressIndex> entry :
type2Index.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ entry.getValue().serialize(byteBuffer);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(type2Index.size(), stream);
+ for (final Map.Entry<Short, ProgressIndex> entry :
type2Index.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ entry.getValue().serialize(stream);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isAfter(ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return true;
+ }
+
+ if (!(progressIndex instanceof HybridProgressIndex)) {
+ final short type = progressIndex.getType().getType();
+ return type2Index.containsKey(type) &&
type2Index.get(type).isAfter(progressIndex);
+ }
+
+ final HybridProgressIndex thisHybridProgressIndex = this;
+ final HybridProgressIndex thatHybridProgressIndex =
(HybridProgressIndex) progressIndex;
+ return thatHybridProgressIndex.type2Index.entrySet().stream()
+ .noneMatch(
+ entry ->
+
!thisHybridProgressIndex.type2Index.containsKey(entry.getKey())
+ || !thisHybridProgressIndex
+ .type2Index
+ .get(entry.getKey())
+ .isAfter(entry.getValue()));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public boolean isGivenProgressIndexAfterSelf(ProgressIndex progressIndex) {
+ return type2Index.size() == 1
+ && type2Index.containsKey(progressIndex.getType().getType())
+ &&
progressIndex.isAfter(type2Index.get(progressIndex.getType().getType()));
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ if (!(progressIndex instanceof HybridProgressIndex)) {
+ return false;
+ }
+
+ final HybridProgressIndex thisHybridProgressIndex = this;
+ final HybridProgressIndex thatHybridProgressIndex =
(HybridProgressIndex) progressIndex;
+ return thisHybridProgressIndex.type2Index.size() ==
thatHybridProgressIndex.type2Index.size()
+ && thatHybridProgressIndex.type2Index.entrySet().stream()
+ .allMatch(
+ entry ->
+
thisHybridProgressIndex.type2Index.containsKey(entry.getKey())
+ && thisHybridProgressIndex
+ .type2Index
+ .get(entry.getKey())
+ .equals(entry.getValue()));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof HybridProgressIndex)) {
+ return false;
+ }
+ return this.equals((HybridProgressIndex) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
+ lock.writeLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return this;
+ }
+
+ if (!(progressIndex instanceof HybridProgressIndex)) {
+ type2Index.compute(
+ progressIndex.getType().getType(),
+ (thisK, thisV) ->
+ (thisV == null
+ ? progressIndex
+ :
thisV.updateToMinimumIsAfterProgressIndex(progressIndex)));
+ return this;
+ }
+
+ final HybridProgressIndex thisHybridProgressIndex = this;
+ final HybridProgressIndex thatHybridProgressIndex =
(HybridProgressIndex) progressIndex;
+ thatHybridProgressIndex.type2Index.forEach(
+ (thatK, thatV) ->
+ thisHybridProgressIndex.type2Index.compute(
+ thatK,
+ (thisK, thisV) ->
+ (thisV == null ? thatV :
thisV.updateToMinimumIsAfterProgressIndex(thatV))));
+ return this;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public ProgressIndexType getType() {
+ return ProgressIndexType.HYBRID_PROGRESS_INDEX;
+ }
+
+ public static HybridProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; i++) {
+ final short type = ReadWriteIOUtils.readShort(byteBuffer);
+ final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(byteBuffer);
+ hybridProgressIndex.type2Index.put(type, progressIndex);
+ }
+ return hybridProgressIndex;
+ }
+
+ public static HybridProgressIndex deserializeFrom(InputStream stream) throws
IOException {
+ final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+ final int size = ReadWriteIOUtils.readInt(stream);
+ for (int i = 0; i < size; i++) {
+ final short type = ReadWriteIOUtils.readShort(stream);
+ final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(stream);
+ hybridProgressIndex.type2Index.put(type, progressIndex);
+ }
+ return hybridProgressIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "HybridProgressIndex{" + "type2Index=" + type2Index + '}';
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index d84ef20394f..60c4092f760 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -29,79 +29,107 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class IoTProgressIndex implements ProgressIndex {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
private final Map<Integer, Long> peerId2SearchIndex;
public IoTProgressIndex() {
peerId2SearchIndex = new HashMap<>();
}
- public void addSearchIndex(Integer peerId, Long searchIndex) {
+ public IoTProgressIndex(Integer peerId, Long searchIndex) {
+ peerId2SearchIndex = new HashMap<>();
peerId2SearchIndex.put(peerId, searchIndex);
}
@Override
public void serialize(ByteBuffer byteBuffer) {
- ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(byteBuffer);
-
- ReadWriteIOUtils.write(peerId2SearchIndex.size(), byteBuffer);
- for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet())
{
- ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
- ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.IOT_PROGRESS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(peerId2SearchIndex.size(), byteBuffer);
+ for (final Map.Entry<Integer, Long> entry :
peerId2SearchIndex.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+ } finally {
+ lock.readLock().unlock();
}
}
@Override
public void serialize(OutputStream stream) throws IOException {
- ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(stream);
-
- ReadWriteIOUtils.write(peerId2SearchIndex.size(), stream);
- for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet())
{
- ReadWriteIOUtils.write(entry.getKey(), stream);
- ReadWriteIOUtils.write(entry.getValue(), stream);
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.IOT_PROGRESS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(peerId2SearchIndex.size(), stream);
+ for (final Map.Entry<Integer, Long> entry :
peerId2SearchIndex.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ } finally {
+ lock.readLock().unlock();
}
}
@Override
public boolean isAfter(ProgressIndex progressIndex) {
- if (progressIndex instanceof MinimumProgressIndex) {
- return true;
+ lock.readLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return true;
+ }
+
+ if (progressIndex instanceof HybridProgressIndex) {
+ return ((HybridProgressIndex)
progressIndex).isGivenProgressIndexAfterSelf(this);
+ }
+
+ if (!(progressIndex instanceof IoTProgressIndex)) {
+ return false;
+ }
+
+ final IoTProgressIndex thisIoTProgressIndex = this;
+ final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
+ return thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+ .noneMatch(
+ entry ->
+
!thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+ ||
thisIoTProgressIndex.peerId2SearchIndex.get(entry.getKey())
+ <= entry.getValue());
+ } finally {
+ lock.readLock().unlock();
}
-
- if (!(progressIndex instanceof IoTProgressIndex)) {
- return false;
- }
-
- final IoTProgressIndex thisIoTProgressIndex = this;
- final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
- return thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
- .noneMatch(
- entry ->
-
!thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
- ||
thisIoTProgressIndex.peerId2SearchIndex.get(entry.getKey())
- <= entry.getValue());
}
@Override
public boolean equals(ProgressIndex progressIndex) {
- if (!(progressIndex instanceof IoTProgressIndex)) {
- return false;
+ lock.readLock().lock();
+ try {
+ if (!(progressIndex instanceof IoTProgressIndex)) {
+ return false;
+ }
+
+ final IoTProgressIndex thisIoTProgressIndex = this;
+ final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
+ return thisIoTProgressIndex.peerId2SearchIndex.size()
+ == thatIoTProgressIndex.peerId2SearchIndex.size()
+ && thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+ .allMatch(
+ entry ->
+
thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+ && thisIoTProgressIndex
+ .peerId2SearchIndex
+ .get(entry.getKey())
+ .equals(entry.getValue()));
+ } finally {
+ lock.readLock().unlock();
}
-
- final IoTProgressIndex thisIoTProgressIndex = this;
- final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
- return thisIoTProgressIndex.peerId2SearchIndex.size()
- == thatIoTProgressIndex.peerId2SearchIndex.size()
- && thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
- .allMatch(
- entry ->
-
thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
- && thisIoTProgressIndex
- .peerId2SearchIndex
- .get(entry.getKey())
- .equals(entry.getValue()));
}
@Override
@@ -125,17 +153,27 @@ public class IoTProgressIndex implements ProgressIndex {
@Override
public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
- if (!(progressIndex instanceof IoTProgressIndex)) {
+ lock.writeLock().lock();
+ try {
+ if (!(progressIndex instanceof IoTProgressIndex)) {
+ return ProgressIndex.blendProgressIndex(this, progressIndex);
+ }
+
+ final IoTProgressIndex thisIoTProgressIndex = this;
+ final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
+ thatIoTProgressIndex.peerId2SearchIndex.forEach(
+ (thatK, thatV) ->
+ thisIoTProgressIndex.peerId2SearchIndex.compute(
+ thatK, (thisK, thisV) -> (thisV == null ? thatV :
Math.max(thisV, thatV))));
return this;
+ } finally {
+ lock.writeLock().unlock();
}
+ }
- final IoTProgressIndex thisIoTProgressIndex = this;
- final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
- thatIoTProgressIndex.peerId2SearchIndex.forEach(
- (thatK, thatV) ->
- thisIoTProgressIndex.peerId2SearchIndex.compute(
- thatK, (thisK, thisV) -> (thisV == null ? thatV :
Math.max(thisV, thatV))));
- return this;
+ @Override
+ public ProgressIndexType getType() {
+ return ProgressIndexType.IOT_PROGRESS_INDEX;
}
public static IoTProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
@@ -144,7 +182,7 @@ public class IoTProgressIndex implements ProgressIndex {
for (int i = 0; i < size; i++) {
final int peerId = ReadWriteIOUtils.readInt(byteBuffer);
final long searchIndex = ReadWriteIOUtils.readLong(byteBuffer);
- ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+ ioTProgressIndex.peerId2SearchIndex.put(peerId, searchIndex);
}
return ioTProgressIndex;
}
@@ -155,7 +193,7 @@ public class IoTProgressIndex implements ProgressIndex {
for (int i = 0; i < size; i++) {
final int peerId = ReadWriteIOUtils.readInt(stream);
final long searchIndex = ReadWriteIOUtils.readLong(stream);
- ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+ ioTProgressIndex.peerId2SearchIndex.put(peerId, searchIndex);
}
return ioTProgressIndex;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index e36b990eae4..6b9c37ec5e1 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -33,12 +33,12 @@ public class MinimumProgressIndex implements ProgressIndex {
@Override
public void serialize(ByteBuffer byteBuffer) {
- ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(byteBuffer);
+ ProgressIndexType.MINIMUM_PROGRESS_INDEX.serialize(byteBuffer);
}
@Override
public void serialize(OutputStream stream) throws IOException {
- ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(stream);
+ ProgressIndexType.MINIMUM_PROGRESS_INDEX.serialize(stream);
}
@Override
@@ -72,6 +72,11 @@ public class MinimumProgressIndex implements ProgressIndex {
return progressIndex == null ? this : progressIndex;
}
+ @Override
+ public ProgressIndexType getType() {
+ return ProgressIndexType.MINIMUM_PROGRESS_INDEX;
+ }
+
public static MinimumProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
return new MinimumProgressIndex();
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
new file mode 100644
index 00000000000..d2742acdb71
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RecoverProgressIndex implements ProgressIndex {
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final Map<Integer, SimpleProgressIndex> dataNodeId2LocalIndex;
+
+ public RecoverProgressIndex() {
+ this.dataNodeId2LocalIndex = new HashMap<>();
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.RECOVER_PROGRESS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(dataNodeId2LocalIndex.size(), byteBuffer);
+ for (final Map.Entry<Integer, SimpleProgressIndex> entry :
dataNodeId2LocalIndex.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ entry.getValue().serialize(byteBuffer);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.RECOVER_PROGRESS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(dataNodeId2LocalIndex.size(), stream);
+ for (final Map.Entry<Integer, SimpleProgressIndex> entry :
dataNodeId2LocalIndex.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ entry.getValue().serialize(stream);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isAfter(ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return true;
+ }
+
+ if (progressIndex instanceof HybridProgressIndex) {
+ return ((HybridProgressIndex)
progressIndex).isGivenProgressIndexAfterSelf(this);
+ }
+
+ if (!(progressIndex instanceof RecoverProgressIndex)) {
+ return false;
+ }
+
+ final RecoverProgressIndex thisRecoverProgressIndex = this;
+ final RecoverProgressIndex thatRecoverProgressIndex =
(RecoverProgressIndex) progressIndex;
+ return thatRecoverProgressIndex.dataNodeId2LocalIndex.entrySet().stream()
+ .noneMatch(
+ entry ->
+
!thisRecoverProgressIndex.dataNodeId2LocalIndex.containsKey(entry.getKey())
+ || !thisRecoverProgressIndex
+ .dataNodeId2LocalIndex
+ .get(entry.getKey())
+ .isAfter(entry.getValue()));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ if (!(progressIndex instanceof RecoverProgressIndex)) {
+ return false;
+ }
+
+ final RecoverProgressIndex thisRecoverProgressIndex = this;
+ final RecoverProgressIndex thatRecoverProgressIndex =
(RecoverProgressIndex) progressIndex;
+ return thisRecoverProgressIndex.dataNodeId2LocalIndex.size()
+ == thatRecoverProgressIndex.dataNodeId2LocalIndex.size()
+ && thatRecoverProgressIndex.dataNodeId2LocalIndex.entrySet().stream()
+ .allMatch(
+ entry ->
+
thisRecoverProgressIndex.dataNodeId2LocalIndex.containsKey(entry.getKey())
+ && thisRecoverProgressIndex
+ .dataNodeId2LocalIndex
+ .get(entry.getKey())
+ .equals(entry.getValue()));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof RecoverProgressIndex)) {
+ return false;
+ }
+ return this.equals((RecoverProgressIndex) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
+ lock.writeLock().lock();
+ try {
+ if (!(progressIndex instanceof RecoverProgressIndex)) {
+ return ProgressIndex.blendProgressIndex(this, progressIndex);
+ }
+
+ final RecoverProgressIndex thisRecoverProgressIndex = this;
+ final RecoverProgressIndex thatRecoverProgressIndex =
(RecoverProgressIndex) progressIndex;
+ thatRecoverProgressIndex.dataNodeId2LocalIndex.forEach(
+ (thatK, thatV) ->
+ thisRecoverProgressIndex.dataNodeId2LocalIndex.compute(
+ thatK,
+ (thisK, thisV) ->
+ (thisV == null
+ ? thatV
+ : (SimpleProgressIndex)
+
thisV.updateToMinimumIsAfterProgressIndex(thatV))));
+ return this;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public ProgressIndexType getType() {
+ return ProgressIndexType.RECOVER_PROGRESS_INDEX;
+ }
+
+ public static RecoverProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ final RecoverProgressIndex recoverProgressIndex = new
RecoverProgressIndex();
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; i++) {
+ final int dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+ final SimpleProgressIndex simpleProgressIndex =
+ SimpleProgressIndex.deserializeFrom(byteBuffer);
+ recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId,
simpleProgressIndex);
+ }
+ return recoverProgressIndex;
+ }
+
+ public static RecoverProgressIndex deserializeFrom(InputStream stream)
throws IOException {
+ final RecoverProgressIndex recoverProgressIndex = new
RecoverProgressIndex();
+ final int size = ReadWriteIOUtils.readInt(stream);
+ for (int i = 0; i < size; i++) {
+ final int dataNodeId = ReadWriteIOUtils.readInt(stream);
+ final SimpleProgressIndex simpleProgressIndex =
SimpleProgressIndex.deserializeFrom(stream);
+ recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId,
simpleProgressIndex);
+ }
+ return recoverProgressIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "RecoverProgressIndex{" + "dataNodeId2LocalIndex=" +
dataNodeId2LocalIndex + '}';
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index 6571d90759a..eaa80096823 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -27,9 +27,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SimpleProgressIndex implements ProgressIndex {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
private final int rebootTimes;
private final long memtableFlushOrderId;
@@ -40,54 +43,78 @@ public class SimpleProgressIndex implements ProgressIndex {
@Override
public void serialize(ByteBuffer byteBuffer) {
- ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(byteBuffer);
-
- ReadWriteIOUtils.write(rebootTimes, byteBuffer);
- ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.SIMPLE_PROGRESS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(rebootTimes, byteBuffer);
+ ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public void serialize(OutputStream stream) throws IOException {
- ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(stream);
-
- ReadWriteIOUtils.write(rebootTimes, stream);
- ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.SIMPLE_PROGRESS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(rebootTimes, stream);
+ ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public boolean isAfter(ProgressIndex progressIndex) {
- if (progressIndex instanceof MinimumProgressIndex) {
- return true;
- }
-
- if (!(progressIndex instanceof SimpleProgressIndex)) {
- return false;
- }
-
- final SimpleProgressIndex thisSimpleProgressIndex = this;
- final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex)
progressIndex;
- if (thisSimpleProgressIndex.rebootTimes >
thatSimpleProgressIndex.rebootTimes) {
- return true;
+ lock.readLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return true;
+ }
+
+ if (progressIndex instanceof HybridProgressIndex) {
+ return ((HybridProgressIndex)
progressIndex).isGivenProgressIndexAfterSelf(this);
+ }
+
+ if (!(progressIndex instanceof SimpleProgressIndex)) {
+ return false;
+ }
+
+ final SimpleProgressIndex thisSimpleProgressIndex = this;
+ final SimpleProgressIndex thatSimpleProgressIndex =
(SimpleProgressIndex) progressIndex;
+ if (thisSimpleProgressIndex.rebootTimes >
thatSimpleProgressIndex.rebootTimes) {
+ return true;
+ }
+ if (thisSimpleProgressIndex.rebootTimes <
thatSimpleProgressIndex.rebootTimes) {
+ return false;
+ }
+ // thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
+ return thisSimpleProgressIndex.memtableFlushOrderId
+ > thatSimpleProgressIndex.memtableFlushOrderId;
+ } finally {
+ lock.readLock().unlock();
}
- if (thisSimpleProgressIndex.rebootTimes <
thatSimpleProgressIndex.rebootTimes) {
- return false;
- }
- // thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
- return thisSimpleProgressIndex.memtableFlushOrderId
- > thatSimpleProgressIndex.memtableFlushOrderId;
}
@Override
public boolean equals(ProgressIndex progressIndex) {
- if (!(progressIndex instanceof SimpleProgressIndex)) {
- return false;
+ lock.readLock().lock();
+ try {
+ if (!(progressIndex instanceof SimpleProgressIndex)) {
+ return false;
+ }
+
+ final SimpleProgressIndex thisSimpleProgressIndex = this;
+ final SimpleProgressIndex thatSimpleProgressIndex =
(SimpleProgressIndex) progressIndex;
+ return thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
+ && thisSimpleProgressIndex.memtableFlushOrderId
+ == thatSimpleProgressIndex.memtableFlushOrderId;
+ } finally {
+ lock.readLock().unlock();
}
-
- final SimpleProgressIndex thisSimpleProgressIndex = this;
- final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex)
progressIndex;
- return thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
- && thisSimpleProgressIndex.memtableFlushOrderId
- == thatSimpleProgressIndex.memtableFlushOrderId;
}
@Override
@@ -111,29 +138,39 @@ public class SimpleProgressIndex implements ProgressIndex
{
@Override
public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
- if (!(progressIndex instanceof SimpleProgressIndex)) {
+ lock.writeLock().lock();
+ try {
+ if (!(progressIndex instanceof SimpleProgressIndex)) {
+ return ProgressIndex.blendProgressIndex(this, progressIndex);
+ }
+
+ final SimpleProgressIndex thisSimpleProgressIndex = this;
+ final SimpleProgressIndex thatSimpleProgressIndex =
(SimpleProgressIndex) progressIndex;
+ if (thisSimpleProgressIndex.rebootTimes >
thatSimpleProgressIndex.rebootTimes) {
+ return this;
+ }
+ if (thisSimpleProgressIndex.rebootTimes <
thatSimpleProgressIndex.rebootTimes) {
+ return progressIndex;
+ }
+ // thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
+ if (thisSimpleProgressIndex.memtableFlushOrderId
+ > thatSimpleProgressIndex.memtableFlushOrderId) {
+ return this;
+ }
+ if (thisSimpleProgressIndex.memtableFlushOrderId
+ < thatSimpleProgressIndex.memtableFlushOrderId) {
+ return progressIndex;
+ }
+ // thisSimpleProgressIndex.memtableFlushOrderId ==
+ // thatSimpleProgressIndex.memtableFlushOrderId
return this;
+ } finally {
+ lock.writeLock().lock();
}
+ }
- final SimpleProgressIndex thisSimpleProgressIndex = this;
- final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex)
progressIndex;
- if (thisSimpleProgressIndex.rebootTimes >
thatSimpleProgressIndex.rebootTimes) {
- return this;
- }
- if (thisSimpleProgressIndex.rebootTimes <
thatSimpleProgressIndex.rebootTimes) {
- return progressIndex;
- }
- // thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
- if (thisSimpleProgressIndex.memtableFlushOrderId
- > thatSimpleProgressIndex.memtableFlushOrderId) {
- return this;
- }
- if (thisSimpleProgressIndex.memtableFlushOrderId
- < thatSimpleProgressIndex.memtableFlushOrderId) {
- return progressIndex;
- }
- // thisSimpleProgressIndex.memtableFlushOrderId ==
thatSimpleProgressIndex.memtableFlushOrderId
- return this;
+ public ProgressIndexType getType() {
+ return ProgressIndexType.SIMPLE_PROGRESS_INDEX;
}
public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
index 04cb676d633..979a8bc550b 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -88,9 +88,8 @@ public class IoTConsensusDataRegionStateMachine extends
DataRegionStateMachine {
for (IndexedConsensusRequest indexedRequest :
batchRequest.getRequests()) {
final PlanNode planNode = grabInsertNode(indexedRequest);
if (planNode instanceof ComparableConsensusRequest) {
- final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
- ioTProgressIndex.addSearchIndex(
- batchRequest.getSourcePeerId(), indexedRequest.getSearchIndex());
+ final IoTProgressIndex ioTProgressIndex =
+ new IoTProgressIndex(batchRequest.getSourcePeerId(),
indexedRequest.getSearchIndex());
((ComparableConsensusRequest)
planNode).setProgressIndex(ioTProgressIndex);
}
deserializedRequest.add(planNode);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2c1d5cc85b5..155653dc877 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -86,6 +86,10 @@ public class PipeRuntimeAgent implements IService {
simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
}
+ public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource
tsFileResource) {
+
simpleConsensusProgressIndexAssigner.assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+ }
+
//////////////////////////// Runtime Exception Handlers
////////////////////////////
public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException
pipeRuntimeException) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index 648c4994f07..e3c54c6eb94 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -50,18 +50,14 @@ public class SimpleConsensusProgressIndexAssigner {
+ File.separator;
private static final String REBOOT_TIMES_FILE_NAME = "reboot_times.txt";
- private boolean isEnable = false;
+ private boolean isSimpleConsensusEnable = false;
private int rebootTimes = 0;
private final AtomicLong memtableFlushOrderId = new AtomicLong(0);
public void start() throws StartupException {
- // only works for simple consensus
- if
(!IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
- return;
- }
-
- isEnable = true;
+ isSimpleConsensusEnable =
+
IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS);
LOGGER.info("Start SimpleConsensusProgressIndexAssigner ...");
try {
@@ -102,11 +98,16 @@ public class SimpleConsensusProgressIndexAssigner {
}
public void assignIfNeeded(TsFileResource tsFileResource) {
- if (!isEnable) {
+ if (!isSimpleConsensusEnable) {
return;
}
tsFileResource.updateProgressIndex(
new SimpleProgressIndex(rebootTimes,
memtableFlushOrderId.getAndIncrement()));
}
+
+ public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource
tsFileResource) {
+ tsFileResource.updateProgressIndex(
+ new SimpleProgressIndex(rebootTimes,
memtableFlushOrderId.getAndIncrement()));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index af67df5e1dc..fd9311738b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -495,7 +495,7 @@ public class DataNode implements DataNodeMBean {
registerManager.register(new JMXService());
JMXService.registerMBean(getInstance(), mbeanName);
- // get resources for trigger,udf...
+ // get resources for trigger,udf,pipe...
prepareResources();
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index d7629d79cb2..e3e675d2457 100644
---
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.exception.WALRecoverException;
import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
@@ -242,6 +243,9 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
tsFileResource.updatePlanIndexes(recoveryMemTable.getMaxPlanIndex());
}
+ // set recover progress index for pipe
+
PipeAgent.runtime().assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+
// if we put following codes in if clause above, this file can be
continued writing into it
// currently, we close this file anyway
writer.endFile();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
index 8a9957cf47a..8300091dba1 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -169,5 +170,10 @@ public class TsFileResourceProgressIndexTest {
}
return this;
}
+
+ @Override
+ public ProgressIndexType getType() {
+ throw new UnsupportedOperationException("method not implemented.");
+ }
}
}