This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4215fc8a049 Merge PipelineDistributedBarrier and
PipelineDistributedBarrierImpl (#23714)
4215fc8a049 is described below
commit 4215fc8a049d9edd7332a1d8c67c3f342f918989
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jan 24 19:31:04 2023 +0800
Merge PipelineDistributedBarrier and PipelineDistributedBarrierImpl (#23714)
---
.../spi/barrier/PipelineDistributedBarrier.java | 70 ----------------------
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 7 +--
.../pipeline/core/job/AbstractPipelineJob.java | 5 +-
.../impl/BarrierMetaDataChangedEventHandler.java | 5 +-
.../PipelineDistributedBarrier.java} | 30 +++++-----
...pipeline.spi.barrier.PipelineDistributedBarrier | 18 ------
...tencyCheckChangedJobConfigurationProcessor.java | 5 +-
.../MigrationChangedJobConfigurationProcessor.java | 5 +-
.../fixture/FixturePipelineDistributedBarrier.java | 51 ----------------
...st.java => PipelineDistributedBarrierTest.java} | 10 ++--
...pipeline.spi.barrier.PipelineDistributedBarrier | 35 -----------
11 files changed, 30 insertions(+), 211 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
deleted file mode 100644
index ce2e93c15b9..00000000000
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.barrier;
-
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Pipeline distributed barrier.
- */
-@SingletonSPI
-public interface PipelineDistributedBarrier extends RequiredSPI {
-
- /**
- * Register distributed barrier.
- *
- * @param barrierPath barrier path
- * @param totalCount total count
- */
- void register(String barrierPath, int totalCount);
-
- /**
- * Persist ephemeral children node.
- *
- * @param barrierPath barrier path
- * @param shardingItem sharding item
- */
- void persistEphemeralChildrenNode(String barrierPath, int shardingItem);
-
- /**
- * Persist ephemeral children node.
- *
- * @param barrierPath barrier path
- */
- void unregister(String barrierPath);
-
- /**
- * Await barrier path all children node is ready.
- *
- * @param barrierPath barrier path
- * @param timeout timeout
- * @param timeUnit time unit
- * @return true if the count reached zero and false if the waiting time
elapsed before the count reached zero
- */
- boolean await(String barrierPath, long timeout, TimeUnit timeUnit);
-
- /**
- * notify children node count check.
- *
- * @param nodePath node path
- */
- void notifyChildrenNodeCountCheck(String nodePath);
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 8a089723540..3dc039a6a0f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -34,12 +34,11 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UnsupportedModeTypeException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
@@ -118,7 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void startDisabledJob(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier =
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+ PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineJobHasAlreadyStartedException(jobId));
@@ -134,7 +133,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void stop(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier =
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+ PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
jobConfigPOJO.setDisabled(true);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 7b90cc229d1..4b7de5b0df4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -28,9 +28,8 @@ import
org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
import java.util.ArrayList;
@@ -101,7 +100,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
return false;
}
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(),
shardingItem);
-
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class).persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
shardingItem);
+ new
PipelineDistributedBarrier().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
shardingItem);
return true;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index c05b2fc6ab5..36591f27f68 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -19,8 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -39,7 +38,7 @@ public final class BarrierMetaDataChangedEventHandler
implements PipelineMetaDat
@Override
public void handle(final DataChangedEvent event) {
if (event.getType() == Type.ADDED) {
-
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class).notifyChildrenNodeCountCheck(event.getKey());
+ new
PipelineDistributedBarrier().notifyChildrenNodeCountCheck(event.getKey());
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
similarity index 83%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index eb05f3c323e..2ecfd906199 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.impl;
+package org.apache.shardingsphere.data.pipeline.core.util;
import com.google.common.base.Strings;
import lombok.Getter;
@@ -24,11 +24,9 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -38,7 +36,7 @@ import java.util.concurrent.TimeUnit;
* Pipeline distributed barrier.
*/
@Slf4j
-public final class PipelineDistributedBarrierImpl implements
PipelineDistributedBarrier {
+public final class PipelineDistributedBarrier {
private static final LazyInitializer<ClusterPersistRepository>
REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
@@ -48,7 +46,7 @@ public final class PipelineDistributedBarrierImpl implements
PipelineDistributed
}
};
- private final Map<String, InnerCountDownLatchHolder> countDownLatchMap =
new ConcurrentHashMap<>();
+ private final Map<String, InnerCountDownLatchHolder> countDownLatchHolders
= new ConcurrentHashMap<>();
@SneakyThrows(ConcurrentException.class)
private static ClusterPersistRepository getRepository() {
@@ -56,14 +54,14 @@ public final class PipelineDistributedBarrierImpl
implements PipelineDistributed
}
/**
- * Register count down latch.
+ * Register distributed barrier.
*
* @param barrierPath barrier path
* @param totalCount total count
*/
public void register(final String barrierPath, final int totalCount) {
getRepository().persist(barrierPath, "");
- countDownLatchMap.computeIfAbsent(barrierPath, k -> new
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+ countDownLatchHolders.computeIfAbsent(barrierPath, key -> new
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
}
/**
@@ -88,7 +86,7 @@ public final class PipelineDistributedBarrierImpl implements
PipelineDistributed
*/
public void unregister(final String barrierPath) {
getRepository().delete(String.join("/", barrierPath));
- InnerCountDownLatchHolder holder =
countDownLatchMap.remove(barrierPath);
+ InnerCountDownLatchHolder holder =
countDownLatchHolders.remove(barrierPath);
if (null != holder) {
holder.getCountDownLatch().countDown();
}
@@ -103,7 +101,7 @@ public final class PipelineDistributedBarrierImpl
implements PipelineDistributed
* @return true if the count reached zero and false if the waiting time
elapsed before the count reached zero
*/
public boolean await(final String barrierPath, final long timeout, final
TimeUnit timeUnit) {
- InnerCountDownLatchHolder holder = countDownLatchMap.get(barrierPath);
+ InnerCountDownLatchHolder holder =
countDownLatchHolders.get(barrierPath);
if (null == holder) {
return false;
}
@@ -118,18 +116,18 @@ public final class PipelineDistributedBarrierImpl
implements PipelineDistributed
return false;
}
- @Override
+ /**
+ * notify children node count check.
+ *
+ * @param nodePath node path
+ */
public void notifyChildrenNodeCountCheck(final String nodePath) {
if (Strings.isNullOrEmpty(nodePath)) {
return;
}
String barrierPath = nodePath.substring(0, nodePath.lastIndexOf("/"));
- InnerCountDownLatchHolder holder = countDownLatchMap.get(barrierPath);
- if (null == holder) {
- return;
- }
- List<String> childrenKeys =
getRepository().getChildrenKeys(barrierPath);
- if (childrenKeys.size() == holder.getTotalCount()) {
+ InnerCountDownLatchHolder holder =
countDownLatchHolders.get(barrierPath);
+ if (null != holder &&
getRepository().getChildrenKeys(barrierPath).size() == holder.getTotalCount()) {
holder.getCountDownLatch().countDown();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
deleted file mode 100644
index abb700ae1d4..00000000000
---
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 56d7dc6fdce..7faf007c6ee 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -24,11 +24,10 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import java.util.Collection;
@@ -46,7 +45,7 @@ public final class
ConsistencyCheckChangedJobConfigurationProcessor implements P
if (jobConfig.isDisabled()) {
Collection<Integer> shardingItems =
PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
- PipelineDistributedBarrier pipelineDistributedBarrier =
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+ PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
for (Integer each : shardingItems) {
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index ad3366fe6ba..d9181fcdb18 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -23,14 +23,13 @@ import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import java.util.Collection;
@@ -48,7 +47,7 @@ public final class MigrationChangedJobConfigurationProcessor
implements Pipeline
if (jobConfig.isDisabled()) {
Collection<Integer> shardingItems =
PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
- PipelineDistributedBarrier pipelineDistributedBarrier =
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+ PipelineDistributedBarrier pipelineDistributedBarrier = new
PipelineDistributedBarrier();
for (Integer each : shardingItems) {
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
deleted file mode 100644
index 9b26d64dea4..00000000000
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
-
-import java.util.concurrent.TimeUnit;
-
-public final class FixturePipelineDistributedBarrier implements
PipelineDistributedBarrier {
-
- @Override
- public void register(final String barrierPath, final int totalCount) {
- }
-
- @Override
- public void persistEphemeralChildrenNode(final String barrierPath, final
int shardingItem) {
- }
-
- @Override
- public void unregister(final String barrierPath) {
- }
-
- @Override
- public boolean await(final String barrierPath, final long timeout, final
TimeUnit timeUnit) {
- return true;
- }
-
- @Override
- public void notifyChildrenNodeCountCheck(final String nodePath) {
- }
-
- @Override
- public boolean isDefault() {
- return true;
- }
-}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
similarity index 87%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index c4762da99df..8cf105b9e43 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.util;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public final class PipelineDistributedBarrierImplTest {
+public final class PipelineDistributedBarrierTest {
@BeforeClass
public static void setUp() {
@@ -44,10 +44,10 @@ public final class PipelineDistributedBarrierImplTest {
String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
PersistRepository repository =
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrierImpl instance = new
PipelineDistributedBarrierImpl();
+ PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
String parentPath = "/barrier";
instance.register(parentPath, 1);
- Map<?, ?> countDownLatchMap = (Map<?, ?>)
Plugins.getMemberAccessor().get(PipelineDistributedBarrierImpl.class.getDeclaredField("countDownLatchMap"),
instance);
+ Map<?, ?> countDownLatchMap = (Map<?, ?>)
Plugins.getMemberAccessor().get(PipelineDistributedBarrier.class.getDeclaredField("countDownLatchHolders"),
instance);
assertNotNull(countDownLatchMap);
assertTrue(countDownLatchMap.containsKey(parentPath));
instance.unregister(parentPath);
@@ -59,7 +59,7 @@ public final class PipelineDistributedBarrierImplTest {
String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
PersistRepository repository =
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrierImpl instance = new
PipelineDistributedBarrierImpl();
+ PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
instance.register(barrierEnablePath, 1);
instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
deleted file mode 100644
index afc648f305a..00000000000
---
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixturePipelineDistributedBarrier