This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch check_consensus_before_answering_region_request in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f8f5a7f5d286d09a97bde91fa14a71067c2a699c Author: Tian Jiang <[email protected]> AuthorDate: Thu Apr 23 18:02:49 2026 +0800 Add Await and DataNodeContext --- .../impl/DataNodeInternalRPCServiceImpl.java | 6 +- .../java/org/apache/iotdb/db/service/DataNode.java | 15 +- .../db/service/DataNodeInternalRPCService.java | 12 +- .../DataNodeInternalRPCServiceImplTest.java | 8 +- .../org/apache/iotdb/commons/concurrent/Await.java | 53 ++++++ .../commons/concurrent/AwaitTimeoutException.java | 31 ++++ .../iotdb/commons/concurrent/ConditionAwaiter.java | 154 +++++++++++++++++ .../java/org/apache/iotdb/commons/AwaitTest.java | 190 +++++++++++++++++++++ 8 files changed, 461 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 9fc5fcb436c..28422bbd837 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -193,6 +193,7 @@ import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType; import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil; import org.apache.iotdb.db.service.DataNode; +import org.apache.iotdb.db.service.DataNode.DataNodeContext; import org.apache.iotdb.db.service.RegionMigrateService; import org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService; import org.apache.iotdb.db.service.metrics.FileMetrics; @@ -416,6 +417,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + private final DataNodeContext dataNodeContext; + private final ExecutorService schemaExecutor = new WrappedThreadPoolExecutor( 0, @@ -430,10 +433,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private static final String SYSTEM = "system"; - public DataNodeInternalRPCServiceImpl() { + public DataNodeInternalRPCServiceImpl(DataNodeContext dataNodeContext) { super(); partitionFetcher = ClusterPartitionFetcher.getInstance(); schemaFetcher = ClusterSchemaFetcher.getInstance(); + this.dataNodeContext = dataNodeContext; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 7004d09872d..568f2c266e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -183,14 +183,16 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { private static final String REGISTER_INTERRUPTION = "Unexpected interruption when waiting to register to the cluster"; - private boolean schemaRegionConsensusStarted = false; - private boolean dataRegionConsensusStarted = false; + private volatile boolean schemaRegionConsensusStarted = false; + private volatile boolean dataRegionConsensusStarted = false; private static Thread watcherThread; + private DataNodeContext context; public DataNode() { super("DataNode"); // We do not init anything here, so that we can re-initialize the instance in IT. DataNodeHolder.INSTANCE = this; + context = new DataNodeContext(); } public static void reinitializeStatics() { @@ -934,7 +936,8 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { protected void registerInternalRPCService() throws StartupException { // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling - registerManager.register(DataNodeInternalRPCService.getInstance()); + DataNodeInternalRPCService instance = DataNodeInternalRPCService.getInstance(); + registerManager.register(); } // make it easier for users to extend ClientRPCServiceImpl to export more rpc services @@ -1373,4 +1376,10 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { // Empty constructor } } + + public class DataNodeContext { + public boolean isAllConsensusStarted() { + return dataRegionConsensusStarted && schemaRegionConsensusStarted; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java index 5de1041a9a0..f3bf8e507c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler; import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl; +import org.apache.iotdb.db.service.DataNode.DataNodeContext; import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics; import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; @@ -44,6 +45,7 @@ public class DataNodeInternalRPCService extends ThriftService private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final AtomicReference<DataNodeInternalRPCServiceImpl> impl = new AtomicReference<>(); + private DataNodeContext dataNodeContext; private DataNodeInternalRPCService() {} @@ -54,9 +56,9 @@ public class DataNodeInternalRPCService extends ThriftService @Override public void initTProcessor() { - impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl()); + DataNodeInternalRPCServiceImpl service = getImpl(); initSyncedServiceImpl(null); - processor = new Processor<>(impl.get()); + processor = new Processor<>(service); } @Override @@ -109,7 +111,7 @@ public class DataNodeInternalRPCService extends ThriftService } public DataNodeInternalRPCServiceImpl getImpl() { - impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl()); + impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl(dataNodeContext)); return impl.get(); } @@ -122,4 +124,8 @@ public class DataNodeInternalRPCService extends ThriftService public static DataNodeInternalRPCService getInstance() { return DataNodeInternalRPCServiceHolder.INSTANCE; } + + public void setDataNodeContext(DataNodeContext dataNodeContext) { + this.dataNodeContext = dataNodeContext; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java index 22a2d6cdc46..f601c9bd1fc 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.service; +import static org.mockito.Mockito.when; + import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -50,6 +52,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; import org.apache.iotdb.db.schemaengine.SchemaEngine; +import org.apache.iotdb.db.service.DataNode.DataNodeContext; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -79,6 +82,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import org.mockito.Mockito; public class DataNodeInternalRPCServiceImplTest { @@ -134,7 +138,9 @@ public class DataNodeInternalRPCServiceImplTest { .createLocalPeer( ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()), genSchemaRegionPeerList(regionReplicaSet)); - dataNodeInternalRPCServiceImpl = new DataNodeInternalRPCServiceImpl(); + DataNodeContext context = Mockito.mock(DataNodeContext.class); + when(context.isAllConsensusStarted()).thenReturn(true); + dataNodeInternalRPCServiceImpl = new DataNodeInternalRPCServiceImpl(context); } @After diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java new file mode 100644 index 00000000000..f08ccd17495 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.concurrent; + +/** + * Lightweight polling utility for production code. Provides a fluent API similar to Awaitility for + * waiting until a condition becomes true. + * + * <pre>{@code + * // Wait with timeout + * Await.await() + * .atMost(5, TimeUnit.SECONDS) + * .pollInterval(100, TimeUnit.MILLISECONDS) + * .until(() -> isReady()); + * + * // Wait forever (use with caution) + * Await.await() + * .forever() + * .pollInterval(1, TimeUnit.SECONDS) + * .until(() -> isReady()); + * + * // Ignore exceptions during polling + * Await.await() + * .atMost(30, TimeUnit.SECONDS) + * .ignoreExceptions() + * .until(() -> tryConnect()); + * }</pre> + */ +public final class Await { + + private Await() {} + + public static ConditionAwaiter await() { + return new ConditionAwaiter(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java new file mode 100644 index 00000000000..b0d5c98bfe7 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.concurrent; + +public class AwaitTimeoutException extends RuntimeException { + + public AwaitTimeoutException(String message) { + super(message); + } + + public AwaitTimeoutException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java new file mode 100644 index 00000000000..f88db57f612 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +public class ConditionAwaiter { + + private static final long DEFAULT_POLL_INTERVAL_MS = 100; + private static final long DEFAULT_TIMEOUT_MS = 10_000; + + private long timeoutMs = DEFAULT_TIMEOUT_MS; + private long pollIntervalMs = DEFAULT_POLL_INTERVAL_MS; + private long pollDelayMs = 0; + private boolean ignoreAllExceptions = false; + private boolean forever = false; + private final List<Class<? extends Exception>> ignoredExceptions = new ArrayList<>(); + + ConditionAwaiter() {} + + public ConditionAwaiter atMost(long time, TimeUnit unit) { + this.timeoutMs = unit.toMillis(time); + return this; + } + + public ConditionAwaiter pollInterval(long time, TimeUnit unit) { + this.pollIntervalMs = unit.toMillis(time); + return this; + } + + public ConditionAwaiter pollDelay(long time, TimeUnit unit) { + this.pollDelayMs = unit.toMillis(time); + return this; + } + + public ConditionAwaiter ignoreExceptions() { + this.ignoreAllExceptions = true; + return this; + } + + public ConditionAwaiter ignoreException(Class<? extends Exception> exceptionType) { + this.ignoredExceptions.add(exceptionType); + return this; + } + + public ConditionAwaiter forever() { + this.forever = true; + return this; + } + + public void until(Callable<Boolean> conditionEvaluator) { + long startTime = System.currentTimeMillis(); + + if (pollDelayMs > 0) { + sleep(pollDelayMs); + } + + Exception lastException = null; + while (true) { + try { + Boolean result = conditionEvaluator.call(); + if (Boolean.TRUE.equals(result)) { + return; + } + lastException = null; + } catch (Exception e) { + if (shouldIgnore(e)) { + lastException = e; + } else if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + throw new AwaitTimeoutException("Interrupted while awaiting condition", e); + } else { + throw new AwaitTimeoutException("Exception while evaluating condition", e); + } + } + + if (!forever && System.currentTimeMillis() - startTime >= timeoutMs) { + String message = String.format("Condition was not met within %d ms", timeoutMs); + if (lastException != null) { + throw new AwaitTimeoutException(message, lastException); + } + throw new AwaitTimeoutException(message); + } + + sleep(pollIntervalMs); + } + } + + public void untilAsserted(Runnable assertion) { + final AssertionErrorHolder holder = new AssertionErrorHolder(); + try { + until( + () -> { + try { + assertion.run(); + return true; + } catch (AssertionError e) { + holder.error = e; + return false; + } + }); + } catch (AwaitTimeoutException e) { + if (holder.error != null) { + throw new AwaitTimeoutException(e.getMessage(), holder.error); + } + throw e; + } + } + + private static final class AssertionErrorHolder { + AssertionError error; + } + + private boolean shouldIgnore(Exception e) { + if (ignoreAllExceptions) { + return true; + } + for (Class<? extends Exception> ignoredType : ignoredExceptions) { + if (ignoredType.isInstance(e)) { + return true; + } + } + return false; + } + + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AwaitTimeoutException("Interrupted while awaiting condition", e); + } + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java new file mode 100644 index 00000000000..ae094c1a806 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons; + +import org.apache.iotdb.commons.concurrent.Await; +import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class AwaitTest { + + @Test + public void testConditionAlreadyTrue() { + Await.await().atMost(1, TimeUnit.SECONDS).until(() -> true); + } + + @Test + public void testConditionBecomesTrue() { + AtomicBoolean flag = new AtomicBoolean(false); + new Thread( + () -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + flag.set(true); + }) + .start(); + + Await.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .until(flag::get); + + assertTrue(flag.get()); + } + + @Test(expected = AwaitTimeoutException.class) + public void testTimeout() { + Await.await() + .atMost(300, TimeUnit.MILLISECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .until(() -> false); + } + + @Test + public void testPollDelay() { + long start = System.currentTimeMillis(); + + Await.await() + .atMost(5, TimeUnit.SECONDS) + .pollDelay(200, TimeUnit.MILLISECONDS) + .until(() -> true); + + long elapsed = System.currentTimeMillis() - start; + assertTrue("Expected at least 200ms delay, got " + elapsed, elapsed >= 180); + } + + @Test + public void testIgnoreAllExceptions() { + AtomicInteger counter = new AtomicInteger(0); + + Await.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .ignoreExceptions() + .until( + () -> { + int val = counter.incrementAndGet(); + if (val < 3) { + throw new RuntimeException("not ready yet"); + } + return true; + }); + + assertTrue(counter.get() >= 3); + } + + @Test + public void testIgnoreSpecificException() { + AtomicInteger counter = new AtomicInteger(0); + + Await.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .ignoreException(IllegalStateException.class) + .until( + () -> { + int val = counter.incrementAndGet(); + if (val < 3) { + throw new IllegalStateException("not ready"); + } + return true; + }); + + assertTrue(counter.get() >= 3); + } + + @Test + public void testNonIgnoredExceptionPropagates() { + try { + Await.await() + .atMost(5, TimeUnit.SECONDS) + .ignoreException(IllegalStateException.class) + .until( + () -> { + throw new IllegalArgumentException("unexpected"); + }); + fail("Should have thrown"); + } catch (AwaitTimeoutException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void testUntilAsserted() { + AtomicInteger value = new AtomicInteger(0); + new Thread( + () -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + value.set(42); + }) + .start(); + + Await.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertEquals(42, value.get())); + } + + @Test + public void testForever() { + AtomicInteger counter = new AtomicInteger(0); + + Await.await() + .forever() + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> counter.incrementAndGet() >= 5); + + assertTrue(counter.get() >= 5); + } + + @Test + public void testTimeoutMessageIncludesLastException() { + try { + Await.await() + .atMost(200, TimeUnit.MILLISECONDS) + .pollInterval(50, TimeUnit.MILLISECONDS) + .ignoreExceptions() + .until( + () -> { + throw new RuntimeException("still failing"); + }); + fail("Should have thrown"); + } catch (AwaitTimeoutException e) { + assertTrue(e.getCause() instanceof RuntimeException); + assertEquals("still failing", e.getCause().getMessage()); + } + } +}
