This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7782e303e2c The method of start scaling and stop scaling change async
to sync (#20222)
7782e303e2c is described below
commit 7782e303e2c3c5e957de35c8646df04abd005805
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 17 11:49:09 2022 +0800
The method of start scaling and stop scaling change async to sync (#20222)
---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 12 ++
.../pipeline/core/execute/PipelineJobExecutor.java | 11 +-
.../core/metadata/node/PipelineMetaDataNode.java | 20 +++
.../core/util/PipelineDistributedBarrier.java | 156 +++++++++++++++++++++
.../pipeline/scenario/migration/MigrationJob.java | 7 +
.../core/util/PipelineDistributedBarrierTest.java | 65 +++++++++
6 files changed, 270 insertions(+), 1 deletion(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 7b2797395a9..a9d8f53cfb1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoun
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -36,6 +37,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* Abstract pipeline job API impl.
@@ -45,6 +47,8 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private final PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
+
@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) +
marshalJobIdLeftPart(pipelineJobId);
@@ -85,6 +89,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void startDisabledJob(final String jobId) {
log.info("Start disabled pipeline job {}", jobId);
+
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
if (!jobConfigPOJO.isDisabled()) {
throw new PipelineVerifyFailedException("Job is already started.");
@@ -92,16 +97,23 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().remove("stop_time");
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+ String barrierPath =
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+ pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
+ pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
@Override
public void stop(final String jobId) {
log.info("Stop pipeline job {}", jobId);
+
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
// TODO updateJobConfiguration might doesn't work
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+ String barrierPath =
PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId);
+ pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
+ pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 7c050174342..b8d93725514 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
@@ -32,6 +33,7 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -46,11 +48,18 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
private static final Pattern CONFIG_PATTERN =
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/(j\\d{2}[0-9a-f]+)/config");
+ private static final Pattern BARRIER_MATCH_PATTERN =
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|disable)/\\d+");
+
private final ExecutorService executor = Executors.newFixedThreadPool(20);
@Override
protected void doStart() {
-
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
event -> getJobConfigPOJO(event).ifPresent(optional -> processEvent(event,
optional)));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
event -> {
+ if (BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches() &&
event.getType() == Type.ADDED) {
+
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
+ }
+ getJobConfigPOJO(event).ifPresent(optional -> processEvent(event,
optional));
+ });
}
private Optional<JobConfigurationPOJO> getJobConfigPOJO(final
DataChangedEvent event) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index b31a0064bed..9e75d46ef13 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -87,4 +87,24 @@ public final class PipelineMetaDataNode {
public static String getScalingCheckResultPath(final String jobId) {
return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "check", "result");
}
+
+ /**
+ * Get scaling job barrier enable path.
+ *
+ * @param jobId job id
+ * @return job barrier path.
+ */
+ public static String getScalingJobBarrierEnablePath(final String jobId) {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "barrier", "enable");
+ }
+
+ /**
+ * Get scaling job barrier disable path.
+ *
+ * @param jobId job id
+ * @return job barrier path.
+ */
+ public static String getScalingJobBarrierDisablePath(final String jobId) {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "barrier", "disable");
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
new file mode 100644
index 00000000000..038fe1f90c7
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pipeline distributed barrier.
+ */
+@Slf4j
+public final class PipelineDistributedBarrier {
+
+ private static final PipelineDistributedBarrier INSTANCE = new
PipelineDistributedBarrier();
+
+ private static final LazyInitializer<ClusterPersistRepository>
REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
+ @Override
+ protected ClusterPersistRepository initialize() {
+ return (ClusterPersistRepository)
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+ }
+ };
+
+ private final Map<String, InnerCountDownLatchHolder> countDownLatchMap =
new ConcurrentHashMap<>();
+
+ @SneakyThrows
+ private static ClusterPersistRepository getRepository() {
+ return REPOSITORY_LAZY_INITIALIZER.get();
+ }
+
+ /**
+ * Get instance.
+ *
+ * @return instance
+ */
+ public static PipelineDistributedBarrier getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Register count down latch.
+ *
+ * @param parentPath parent path
+ * @param totalCount total count
+ */
+ public void register(final String parentPath, final int totalCount) {
+ getRepository().persist(parentPath, "");
+ countDownLatchMap.computeIfAbsent(parentPath, k -> new
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+ }
+
+ /**
+ * Persist ephemeral children node.
+ *
+ * @param parentPath parent path
+ * @param shardingItem sharding item
+ */
+ public void persistEphemeralChildrenNode(final String parentPath, final
int shardingItem) {
+ String key = String.join("/", parentPath,
Integer.toString(shardingItem));
+ getRepository().delete(key);
+ getRepository().persistEphemeral(key, "");
+ }
+
+ /**
+ * Persist ephemeral children node.
+ *
+ * @param parentPath parent path
+ */
+ public void removeParentNode(final String parentPath) {
+ getRepository().delete(String.join("/", parentPath));
+ InnerCountDownLatchHolder holder =
countDownLatchMap.remove(parentPath);
+ if (null != holder) {
+ holder.getCountDownLatch().countDown();
+ }
+ }
+
+ /**
+ * Await unitl all children node is ready.
+ *
+ * @param parentPath parent path
+ * @param timeout timeout
+ * @param timeUnit time unit
+ * @return true if the count reached zero and false if the waiting time
elapsed before the count reached zero
+ */
+ public boolean await(final String parentPath, final long timeout, final
TimeUnit timeUnit) {
+ InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+ if (null == holder) {
+ return false;
+ }
+ try {
+ boolean result = holder.getCountDownLatch().await(timeout,
timeUnit);
+ if (!result) {
+ log.info("await timeout, parent path: {}, timeout: {}, time
unit: {}", parentPath, timeout, timeUnit);
+ }
+ return result;
+ } catch (final InterruptedException ignored) {
+ }
+ return false;
+ }
+
+ /**
+ * Check child node count equal shardingCount.
+ *
+ * @param event event
+ */
+ public void checkChildrenNodeCount(final DataChangedEvent event) {
+ if (StringUtils.isBlank(event.getKey())) {
+ return;
+ }
+ String parentPath = event.getKey().substring(0,
event.getKey().lastIndexOf("/"));
+ InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+ if (null == holder) {
+ return;
+ }
+ List<String> childrenKeys =
getRepository().getChildrenKeys(parentPath);
+ log.info("children keys: {}, total count: {}", childrenKeys,
holder.getTotalCount());
+ if (childrenKeys.size() == holder.getTotalCount()) {
+ holder.getCountDownLatch().countDown();
+ }
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ private static class InnerCountDownLatchHolder {
+
+ private final int totalCount;
+
+ private final CountDownLatch countDownLatch;
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 001da102255..f730c8f3758 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -31,7 +31,9 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredExc
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -44,6 +46,8 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ private final PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
+
// Shared by all sharding items
private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
@@ -72,6 +76,7 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
});
getTasksRunnerMap().put(shardingItem, tasksRunner);
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(),
shardingItem);
+
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(getJobId()),
shardingItem);
}
private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -105,8 +110,10 @@ public final class MigrationJob extends
AbstractPipelineJob implements SimpleJob
return;
}
log.info("stop tasks runner, jobId={}", getJobId());
+ String scalingJobBarrierDisablePath =
PipelineMetaDataNode.getScalingJobBarrierDisablePath(getJobId());
for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
each.stop();
+
pipelineDistributedBarrier.persistEphemeralChildrenNode(scalingJobBarrierDisablePath,
each.getJobItemContext().getShardingItem());
}
getTasksRunnerMap().clear();
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
new file mode 100644
index 00000000000..94641521d09
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("rawtypes")
+public final class PipelineDistributedBarrierTest {
+
+ @BeforeClass
+ public static void setUp() {
+ PipelineContextUtil.mockModeConfigAndContextManager();
+ }
+
+ @Test
+ public void assertRegisterAndRemove() throws NoSuchFieldException,
IllegalAccessException {
+ PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance();
+ instance.register("/test", 1);
+ Map countDownLatchMap = ReflectionUtil.getFieldValue(instance,
"countDownLatchMap", Map.class);
+ assertNotNull(countDownLatchMap);
+ assertThat(countDownLatchMap.size(), is(1));
+ instance.removeParentNode("/test");
+ assertThat(countDownLatchMap.size(), is(0));
+ }
+
+ @Test
+ public void assertAwait() {
+ PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance();
+ String parentPath =
"/scaling/j0130317c3054317c7363616c696e675f626d73716c/barrier/enable";
+ instance.register(parentPath, 1);
+ instance.persistEphemeralChildrenNode(parentPath, 1);
+ boolean actual = instance.await(parentPath, 1, TimeUnit.SECONDS);
+ assertFalse(actual);
+ instance.checkChildrenNodeCount(new
DataChangedEvent("/scaling/j0130317c3054317c7363616c696e675f626d73716c/barrier/enable/1",
"", Type.ADDED));
+ actual = instance.await(parentPath, 1, TimeUnit.SECONDS);
+ assertTrue(actual);
+ }
+}