This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4215fc8a049 Merge PipelineDistributedBarrier and 
PipelineDistributedBarrierImpl (#23714)
4215fc8a049 is described below

commit 4215fc8a049d9edd7332a1d8c67c3f342f918989
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jan 24 19:31:04 2023 +0800

    Merge PipelineDistributedBarrier and PipelineDistributedBarrierImpl (#23714)
---
 .../spi/barrier/PipelineDistributedBarrier.java    | 70 ----------------------
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  7 +--
 .../pipeline/core/job/AbstractPipelineJob.java     |  5 +-
 .../impl/BarrierMetaDataChangedEventHandler.java   |  5 +-
 .../PipelineDistributedBarrier.java}               | 30 +++++-----
 ...pipeline.spi.barrier.PipelineDistributedBarrier | 18 ------
 ...tencyCheckChangedJobConfigurationProcessor.java |  5 +-
 .../MigrationChangedJobConfigurationProcessor.java |  5 +-
 .../fixture/FixturePipelineDistributedBarrier.java | 51 ----------------
 ...st.java => PipelineDistributedBarrierTest.java} | 10 ++--
 ...pipeline.spi.barrier.PipelineDistributedBarrier | 35 -----------
 11 files changed, 30 insertions(+), 211 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
deleted file mode 100644
index ce2e93c15b9..00000000000
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.barrier;
-
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Pipeline distributed barrier.
- */
-@SingletonSPI
-public interface PipelineDistributedBarrier extends RequiredSPI {
-    
-    /**
-     * Register distributed barrier.
-     *
-     * @param barrierPath barrier path
-     * @param totalCount total count
-     */
-    void register(String barrierPath, int totalCount);
-    
-    /**
-     * Persist ephemeral children node.
-     *
-     * @param barrierPath barrier path
-     * @param shardingItem sharding item
-     */
-    void persistEphemeralChildrenNode(String barrierPath, int shardingItem);
-    
-    /**
-     * Persist ephemeral children node.
-     *
-     * @param barrierPath barrier path
-     */
-    void unregister(String barrierPath);
-    
-    /**
-     * Await barrier path all children node is ready.
-     *
-     * @param barrierPath barrier path
-     * @param timeout timeout
-     * @param timeUnit time unit
-     * @return true if the count reached zero and false if the waiting time 
elapsed before the count reached zero
-     */
-    boolean await(String barrierPath, long timeout, TimeUnit timeUnit);
-    
-    /**
-     * notify children node count check.
-     *
-     * @param nodePath node path
-     */
-    void notifyChildrenNodeCountCheck(String nodePath);
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 8a089723540..3dc039a6a0f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -34,12 +34,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UnsupportedModeTypeException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
@@ -118,7 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     @Override
     public void startDisabledJob(final String jobId) {
-        PipelineDistributedBarrier pipelineDistributedBarrier = 
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+        PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineJobHasAlreadyStartedException(jobId));
@@ -134,7 +133,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     @Override
     public void stop(final String jobId) {
-        PipelineDistributedBarrier pipelineDistributedBarrier = 
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+        PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 7b90cc229d1..4b7de5b0df4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -28,9 +28,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
-import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
 import java.util.ArrayList;
@@ -101,7 +100,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
             return false;
         }
         
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
-        
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class).persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
+        new 
PipelineDistributedBarrier().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
         return true;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index c05b2fc6ab5..36591f27f68 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -19,8 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler
 
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
-import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
@@ -39,7 +38,7 @@ public final class BarrierMetaDataChangedEventHandler 
implements PipelineMetaDat
     @Override
     public void handle(final DataChangedEvent event) {
         if (event.getType() == Type.ADDED) {
-            
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class).notifyChildrenNodeCountCheck(event.getKey());
+            new 
PipelineDistributedBarrier().notifyChildrenNodeCountCheck(event.getKey());
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
similarity index 83%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index eb05f3c323e..2ecfd906199 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.impl;
+package org.apache.shardingsphere.data.pipeline.core.util;
 
 import com.google.common.base.Strings;
 import lombok.Getter;
@@ -24,11 +24,9 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -38,7 +36,7 @@ import java.util.concurrent.TimeUnit;
  * Pipeline distributed barrier.
  */
 @Slf4j
-public final class PipelineDistributedBarrierImpl implements 
PipelineDistributedBarrier {
+public final class PipelineDistributedBarrier {
     
     private static final LazyInitializer<ClusterPersistRepository> 
REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
         
@@ -48,7 +46,7 @@ public final class PipelineDistributedBarrierImpl implements 
PipelineDistributed
         }
     };
     
-    private final Map<String, InnerCountDownLatchHolder> countDownLatchMap = 
new ConcurrentHashMap<>();
+    private final Map<String, InnerCountDownLatchHolder> countDownLatchHolders 
= new ConcurrentHashMap<>();
     
     @SneakyThrows(ConcurrentException.class)
     private static ClusterPersistRepository getRepository() {
@@ -56,14 +54,14 @@ public final class PipelineDistributedBarrierImpl 
implements PipelineDistributed
     }
     
     /**
-     * Register count down latch.
+     * Register distributed barrier.
      *
      * @param barrierPath barrier path
      * @param totalCount total count
      */
     public void register(final String barrierPath, final int totalCount) {
         getRepository().persist(barrierPath, "");
-        countDownLatchMap.computeIfAbsent(barrierPath, k -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+        countDownLatchHolders.computeIfAbsent(barrierPath, key -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
     }
     
     /**
@@ -88,7 +86,7 @@ public final class PipelineDistributedBarrierImpl implements 
PipelineDistributed
      */
     public void unregister(final String barrierPath) {
         getRepository().delete(String.join("/", barrierPath));
-        InnerCountDownLatchHolder holder = 
countDownLatchMap.remove(barrierPath);
+        InnerCountDownLatchHolder holder = 
countDownLatchHolders.remove(barrierPath);
         if (null != holder) {
             holder.getCountDownLatch().countDown();
         }
@@ -103,7 +101,7 @@ public final class PipelineDistributedBarrierImpl 
implements PipelineDistributed
      * @return true if the count reached zero and false if the waiting time 
elapsed before the count reached zero
      */
     public boolean await(final String barrierPath, final long timeout, final 
TimeUnit timeUnit) {
-        InnerCountDownLatchHolder holder = countDownLatchMap.get(barrierPath);
+        InnerCountDownLatchHolder holder = 
countDownLatchHolders.get(barrierPath);
         if (null == holder) {
             return false;
         }
@@ -118,18 +116,18 @@ public final class PipelineDistributedBarrierImpl 
implements PipelineDistributed
         return false;
     }
     
-    @Override
+    /**
+     * notify children node count check.
+     *
+     * @param nodePath node path
+     */
     public void notifyChildrenNodeCountCheck(final String nodePath) {
         if (Strings.isNullOrEmpty(nodePath)) {
             return;
         }
         String barrierPath = nodePath.substring(0, nodePath.lastIndexOf("/"));
-        InnerCountDownLatchHolder holder = countDownLatchMap.get(barrierPath);
-        if (null == holder) {
-            return;
-        }
-        List<String> childrenKeys = 
getRepository().getChildrenKeys(barrierPath);
-        if (childrenKeys.size() == holder.getTotalCount()) {
+        InnerCountDownLatchHolder holder = 
countDownLatchHolders.get(barrierPath);
+        if (null != holder && 
getRepository().getChildrenKeys(barrierPath).size() == holder.getTotalCount()) {
             holder.getCountDownLatch().countDown();
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
 
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
deleted file mode 100644
index abb700ae1d4..00000000000
--- 
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 56d7dc6fdce..7faf007c6ee 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -24,11 +24,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.Collection;
@@ -46,7 +45,7 @@ public final class 
ConsistencyCheckChangedJobConfigurationProcessor implements P
         if (jobConfig.isDisabled()) {
             Collection<Integer> shardingItems = 
PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
-            PipelineDistributedBarrier pipelineDistributedBarrier = 
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+            PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
             for (Integer each : shardingItems) {
                 
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 each);
             }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index ad3366fe6ba..d9181fcdb18 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -23,14 +23,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.Collection;
@@ -48,7 +47,7 @@ public final class MigrationChangedJobConfigurationProcessor 
implements Pipeline
         if (jobConfig.isDisabled()) {
             Collection<Integer> shardingItems = 
PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
-            PipelineDistributedBarrier pipelineDistributedBarrier = 
RequiredSPIRegistry.getService(PipelineDistributedBarrier.class);
+            PipelineDistributedBarrier pipelineDistributedBarrier = new 
PipelineDistributedBarrier();
             for (Integer each : shardingItems) {
                 
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 each);
             }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
deleted file mode 100644
index 9b26d64dea4..00000000000
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
-
-import java.util.concurrent.TimeUnit;
-
-public final class FixturePipelineDistributedBarrier implements 
PipelineDistributedBarrier {
-    
-    @Override
-    public void register(final String barrierPath, final int totalCount) {
-    }
-    
-    @Override
-    public void persistEphemeralChildrenNode(final String barrierPath, final 
int shardingItem) {
-    }
-    
-    @Override
-    public void unregister(final String barrierPath) {
-    }
-    
-    @Override
-    public boolean await(final String barrierPath, final long timeout, final 
TimeUnit timeUnit) {
-        return true;
-    }
-    
-    @Override
-    public void notifyChildrenNodeCountCheck(final String nodePath) {
-    }
-    
-    @Override
-    public boolean isDefault() {
-        return true;
-    }
-}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
similarity index 87%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index c4762da99df..8cf105b9e43 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.util;
 
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public final class PipelineDistributedBarrierImplTest {
+public final class PipelineDistributedBarrierTest {
     
     @BeforeClass
     public static void setUp() {
@@ -44,10 +44,10 @@ public final class PipelineDistributedBarrierImplTest {
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrierImpl instance = new 
PipelineDistributedBarrierImpl();
+        PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
         String parentPath = "/barrier";
         instance.register(parentPath, 1);
-        Map<?, ?> countDownLatchMap = (Map<?, ?>) 
Plugins.getMemberAccessor().get(PipelineDistributedBarrierImpl.class.getDeclaredField("countDownLatchMap"),
 instance);
+        Map<?, ?> countDownLatchMap = (Map<?, ?>) 
Plugins.getMemberAccessor().get(PipelineDistributedBarrier.class.getDeclaredField("countDownLatchHolders"),
 instance);
         assertNotNull(countDownLatchMap);
         assertTrue(countDownLatchMap.containsKey(parentPath));
         instance.unregister(parentPath);
@@ -59,7 +59,7 @@ public final class PipelineDistributedBarrierImplTest {
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrierImpl instance = new 
PipelineDistributedBarrierImpl();
+        PipelineDistributedBarrier instance = new PipelineDistributedBarrier();
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         instance.register(barrierEnablePath, 1);
         instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
deleted file mode 100644
index afc648f305a..00000000000
--- 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixturePipelineDistributedBarrier

Reply via email to