This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ef338f  add unit test for shardingsphere-scaling (#6963)
4ef338f is described below

commit 4ef338fcb2f7b6974441787903f36b1ae3f1a705
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Fri Aug 21 17:33:53 2020 +0800

    add unit test for shardingsphere-scaling (#6963)
    
    * add unit test for shardingsphere-scaling-core
    
    * add generic for ResponseContent
    
    * add unit test
    
    * add final keyword
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/utils/ResponseContentUtil.java         |  10 +-
 .../scaling/utils/ReflectionUtil.java              |  71 ++++++++++++++
 .../scaling/utils/ResponseContentUtilTest.java     |  47 +++++++++
 .../scaling/web/HttpServerHandlerTest.java         |  50 ++++++----
 .../scaling/web/HttpServerInitializerTest.java     |  54 ++++++++++
 .../engine/ShardingScalingExecuteEngine.java       |   9 +-
 .../core/execute/executor/channel/Channel.java     |   2 +-
 .../execute/executor/channel/MemoryChannel.java    |   7 +-
 .../executor/dumper/AbstractJDBCDumper.java        |   8 +-
 ...osition.java => FinishedInventoryPosition.java} |   4 +-
 ...tion.java => PlaceholderInventoryPosition.java} |   4 +-
 .../resume/AbstractResumeBreakPointManager.java    |   6 +-
 .../splitter/InventoryDataTaskSplitter.java        |   4 +-
 .../scaling/core/metadata/JdbcUri.java             |   2 +-
 .../scaling/core/utils/InventoryPositionUtil.java  |   4 +-
 .../ThreadUtil.java}                               |  27 +++--
 .../engine/ShardingScalingExecuteEngineTest.java   |  33 ++++++-
 .../executor/channel/DistributionChannelTest.java  | 109 +++++++++++++++++++++
 .../core/fixture/FixtureH2ScalingEntry.java        |   2 +-
 .../scaling/core/fixture/FixtureNopManager.java    |  53 ++++++++++
 .../position/PlaceholderIncrementalPosition.java}  |   4 +-
 .../AbstractResumeBreakPointManagerTest.java       |  56 +++++++----
 .../scaling/mysql/MySQLJdbcDumper.java             |  26 ++---
 .../scaling/mysql/BinlogPositionTest.java          |   7 ++
 .../scaling/mysql/MySQLBinlogDumperTest.java}      |  19 +---
 .../scaling/mysql/MySQLDataSourceCheckerTest.java  |  29 +++++-
 .../scaling/mysql/MySQLImporterTest.java           |  56 +++++++++++
 .../scaling/mysql/MySQLJdbcDumperTest.java         | 107 ++++++++++++++++++++
 .../scaling/mysql/MySQLPositionManagerTest.java    |  12 ++-
 .../postgresql/PostgreSQLPositionManager.java      |   2 +-
 .../scaling/postgresql/PostgreSQLWalDumper.java    |  10 +-
 .../postgresql/PostgreSQLPositionManagerTest.java  |  26 +++--
 .../postgresql/PostgreSQLSqlBuilderTest.java       |  44 +++++++++
 33 files changed, 773 insertions(+), 131 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
index 290b8c8..6ce7ea3 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
@@ -33,7 +33,7 @@ public final class ResponseContentUtil {
      *
      * @return response result
      */
-    public static ResponseContent success() {
+    public static ResponseContent<?> success() {
         return build(null);
     }
     
@@ -57,8 +57,8 @@ public final class ResponseContentUtil {
      * @param errorMsg error message
      * @return response result
      */
-    public static ResponseContent handleBadRequest(final String errorMsg) {
-        ResponseContent result = new ResponseContent<>();
+    public static ResponseContent<?> handleBadRequest(final String errorMsg) {
+        ResponseContent<?> result = new ResponseContent<>();
         result.setSuccess(false);
         result.setErrorCode(ResponseCode.BAD_REQUEST);
         result.setErrorMsg(errorMsg);
@@ -71,8 +71,8 @@ public final class ResponseContentUtil {
      * @param errorMsg error message
      * @return response result
      */
-    public static ResponseContent handleException(final String errorMsg) {
-        ResponseContent result = new ResponseContent<>();
+    public static ResponseContent<?> handleException(final String errorMsg) {
+        ResponseContent<?> result = new ResponseContent<>();
         result.setSuccess(false);
         result.setErrorCode(ResponseCode.SERVER_ERROR);
         result.setErrorMsg(errorMsg);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ReflectionUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ReflectionUtil.java
new file mode 100644
index 0000000..1bb1e69
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ReflectionUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.lang.reflect.Field;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ReflectionUtil {
+    
+    /**
+     * Get field from class.
+     *
+     * @param targetClass target class
+     * @param fieldName field name
+     * @param isDeclared is declared
+     * @return {@link Field}
+     * @throws NoSuchFieldException no such field exception
+     */
+    public static Field getFieldFromClass(final Class<?> targetClass, final 
String fieldName, final boolean isDeclared) throws NoSuchFieldException {
+        Field result;
+        if (isDeclared) {
+            result = targetClass.getDeclaredField(fieldName);
+        } else {
+            result = targetClass.getField(fieldName);
+        }
+        result.setAccessible(true);
+        return result;
+    }
+    
+    /**
+     * Get field value from instance target object.
+     *
+     * @param target target object
+     * @param fieldName field name
+     * @param valueClass expected value class
+     * @param <T> expected value class
+     * @return target filed value
+     * @throws NoSuchFieldException no such field exception
+     * @throws IllegalAccessException illegal access exception
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getFieldValueFromClass(final Object target, final 
String fieldName, final Class<T> valueClass) throws NoSuchFieldException, 
IllegalAccessException {
+        Field field = getFieldFromClass(target.getClass(), fieldName, true);
+        Object value = field.get(target);
+        if (null == value) {
+            return null;
+        }
+        if (value.getClass().isAssignableFrom(value.getClass())) {
+            return (T) value;
+        }
+        throw new ClassCastException("field " + fieldName + " is " + 
target.getClass().getName() + " can cast to " + valueClass.getName());
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtilTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtilTest.java
new file mode 100644
index 0000000..2652107
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtilTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.utils;
+
+import org.apache.shardingsphere.scaling.web.entity.ResponseCode;
+import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class ResponseContentUtilTest {
+    
+    public static final String ERROR_MESSAGE = "error message.";
+    
+    @Test
+    public void assertHandleBadRequest() {
+        ResponseContent<?> responseContent = 
ResponseContentUtil.handleBadRequest(ERROR_MESSAGE);
+        assertThat(responseContent.getErrorMsg(), is(ERROR_MESSAGE));
+        assertThat(responseContent.getErrorCode(), 
is(ResponseCode.BAD_REQUEST));
+        assertNull(responseContent.getModel());
+    }
+    
+    @Test
+    public void assertHandleException() {
+        ResponseContent<?> responseContent = 
ResponseContentUtil.handleException(ERROR_MESSAGE);
+        assertThat(responseContent.getErrorMsg(), is(ERROR_MESSAGE));
+        assertThat(responseContent.getErrorCode(), 
is(ResponseCode.SERVER_ERROR));
+        assertNull(responseContent.getModel());
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index d4e4915..8cbc268 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -45,6 +45,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -70,6 +71,14 @@ public final class HttpServerHandlerTest {
     
     @Test
     public void assertChannelReadStartSuccess() {
+        startScalingJob();
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
+        
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
+    }
+    
+    private void startScalingJob() {
         scalingConfiguration.getRuleConfiguration().setSourceDatasource("ds_0: 
!!" + YamlDataSourceConfiguration.class.getName() + "\n  "
                 + "dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n  
props:\n    "
                 + "jdbcUrl: 
jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL\n    
username: root\n    password: 'password'\n    connectionTimeout: 30000\n    "
@@ -80,20 +89,27 @@ public final class HttpServerHandlerTest {
         ByteBuf byteBuf = 
Unpooled.copiedBuffer(GSON.toJson(scalingConfiguration), CharsetUtil.UTF_8);
         fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.POST, "/scaling/job/start", byteBuf);
         httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
-        ArgumentCaptor argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
-        verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        FullHttpResponse fullHttpResponse = (FullHttpResponse) 
argumentCaptor.getValue();
-        
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
     }
     
     @Test
-    public void assertChannelReadProgress() {
-        fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.GET, "/scaling/job/progress/2");
+    public void assertChannelReadProgressFail() {
+        fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.GET, "/scaling/job/progress/9");
         httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
-        ArgumentCaptor argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
         verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        FullHttpResponse fullHttpResponse = (FullHttpResponse) 
argumentCaptor.getValue();
-        
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Can't
 find scaling job id 2"));
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
+        
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Can't
 find scaling job id 9"));
+    }
+    
+    @Test
+    public void assertChannelReadProgressSuccess() {
+        startScalingJob();
+        fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.GET, "/scaling/job/progress/1");
+        httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        verify(channelHandlerContext, 
times(2)).writeAndFlush(argumentCaptor.capture());
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
+        
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
     }
     
     @Test
@@ -103,9 +119,9 @@ public final class HttpServerHandlerTest {
         ByteBuf byteBuf = Unpooled.copiedBuffer(GSON.toJson(map), 
CharsetUtil.UTF_8);
         fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.POST, "/scaling/job/stop", byteBuf);
         httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
-        ArgumentCaptor argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
         verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        FullHttpResponse fullHttpResponse = (FullHttpResponse) 
argumentCaptor.getValue();
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
         
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
     }
     
@@ -113,9 +129,9 @@ public final class HttpServerHandlerTest {
     public void assertChannelReadList() {
         fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.GET, "/scaling/job/list");
         httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
-        ArgumentCaptor argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
         verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        FullHttpResponse fullHttpResponse = (FullHttpResponse) 
argumentCaptor.getValue();
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
         
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
     }
     
@@ -123,9 +139,9 @@ public final class HttpServerHandlerTest {
     public void assertChannelReadUnsupportedUrl() {
         fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.DELETE, "/scaling/1");
         httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
-        ArgumentCaptor argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
         verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        FullHttpResponse fullHttpResponse = (FullHttpResponse) 
argumentCaptor.getValue();
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
         
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Not 
support request!"));
     }
     
@@ -133,9 +149,9 @@ public final class HttpServerHandlerTest {
     public void assertChannelReadUnsupportedMethod() {
         fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.DELETE, "/scaling/job/stop");
         httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
-        ArgumentCaptor argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
+        ArgumentCaptor<FullHttpResponse> argumentCaptor = 
ArgumentCaptor.forClass(FullHttpResponse.class);
         verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        FullHttpResponse fullHttpResponse = (FullHttpResponse) 
argumentCaptor.getValue();
+        FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
         
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Not 
support request!"));
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
new file mode 100644
index 0000000..0f87fe4
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.web;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class HttpServerInitializerTest {
+    
+    @Mock
+    private SocketChannel socketChannel;
+    
+    @Mock
+    private ChannelPipeline channelPipeline;
+    
+    @Before
+    public void setUp() {
+        when(socketChannel.pipeline()).thenReturn(channelPipeline);
+    }
+    
+    @Test
+    public void assertInitChannel() {
+        HttpServerInitializer httpServerInitializer = new 
HttpServerInitializer();
+        httpServerInitializer.initChannel(socketChannel);
+        verify(channelPipeline, times(3)).addLast(any(ChannelHandler.class));
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
index 261aba7..bba6b04 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingEx
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
@@ -73,13 +74,13 @@ public final class ShardingScalingExecuteEngine {
      * @return execute future of all
      */
     public Future<?> submitAll(final Collection<? extends 
ShardingScalingExecutor> shardingScalingExecutors, final ExecuteCallback 
executeCallback) {
-        Collection<ListenableFuture<Object>> listenableFutures = new 
ArrayList<>(shardingScalingExecutors.size());
+        Collection<ListenableFuture<?>> listenableFutures = new 
ArrayList<>(shardingScalingExecutors.size());
         for (ShardingScalingExecutor each : shardingScalingExecutors) {
-            ListenableFuture listenableFuture = executorService.submit(each);
+            ListenableFuture<?> listenableFuture = 
executorService.submit(each);
             listenableFutures.add(listenableFuture);
         }
-        ListenableFuture result = Futures.allAsList(listenableFutures);
-        Futures.addCallback(result, new 
ExecuteFutureCallback<Collection<Object>>(executeCallback), executorService);
+        ListenableFuture<List<Object>> result = 
Futures.allAsList(listenableFutures);
+        Futures.addCallback(result, new 
ExecuteFutureCallback<Collection<?>>(executeCallback), executorService);
         return result;
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
index 1b38825..6c31d2e 100755
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
@@ -38,7 +38,7 @@ public interface Channel {
      * fetch {@code Record} from channel, if the timeout also returns the 
record.
      *
      * @param batchSize record batch size
-     * @param timeout value
+     * @param timeout timeout(seconds)
      * @return record
      */
     List<Record> fetchRecords(int batchSize, int timeout);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
index 72dd930..da20ad1 100755
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.scaling.core.execute.executor.channel;
 
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -54,11 +55,7 @@ public final class MemoryChannel implements Channel {
             if (timeout * 1000L <= System.currentTimeMillis() - start) {
                 break;
             }
-            try {
-                Thread.sleep(100L);
-            } catch (final InterruptedException ignored) {
-                break;
-            }
+            ThreadUtil.sleep(100L);
         }
         queue.drainTo(records, batchSize);
         toBeAcknowledgeRecords.addAll(records);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index a47bec2..30da411 100755
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -32,10 +32,10 @@ import 
org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
 import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
@@ -98,7 +98,7 @@ public abstract class AbstractJDBCDumper extends 
AbstractShardingScalingExecutor
                 }
                 pushRecord(record);
             }
-            pushRecord(new FinishedRecord(new FinishedPosition()));
+            pushRecord(new FinishedRecord(new FinishedInventoryPosition()));
         } catch (final SQLException ex) {
             stop();
             channel.close();
@@ -110,7 +110,7 @@ public abstract class AbstractJDBCDumper extends 
AbstractShardingScalingExecutor
     
     private InventoryPosition newInventoryPosition(final ResultSet rs) throws 
SQLException {
         if (null == inventoryDumperConfiguration.getPrimaryKey()) {
-            return new PlaceholderPosition();
+            return new PlaceholderInventoryPosition();
         }
         return new 
PrimaryKeyPosition(rs.getLong(inventoryDumperConfiguration.getPrimaryKey()), 
((PrimaryKeyPosition) 
inventoryDumperConfiguration.getPositionManager().getPosition()).getEndValue());
     }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedInventoryPosition.java
similarity index 91%
copy from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
copy to 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedInventoryPosition.java
index 212f168..a094116 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedInventoryPosition.java
@@ -21,9 +21,9 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
 /**
- * Finished position.
+ * Finished inventory position.
  */
-public final class FinishedPosition implements InventoryPosition {
+public final class FinishedInventoryPosition implements InventoryPosition {
     
     @Override
     public JsonElement toJson() {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderInventoryPosition.java
similarity index 90%
rename from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderInventoryPosition.java
index 416f594..e3d17e1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderInventoryPosition.java
@@ -21,9 +21,9 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 
 /**
- * Placeholder position.
+ * Placeholder inventory position.
  */
-public final class PlaceholderPosition implements InventoryPosition {
+public final class PlaceholderInventoryPosition implements InventoryPosition {
     
     @Override
     public JsonElement toJson() {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
index d312c7e..6e26122 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
@@ -26,7 +26,7 @@ import com.google.gson.JsonParser;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import 
org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
@@ -83,7 +83,7 @@ public abstract class AbstractResumeBreakPointManager 
implements ResumeBreakPoin
             inventoryPositionManagerMap.put(entry.getKey(), new 
InventoryPositionManager<>(entry.getValue()));
         }
         for (String each : inventoryPositions.getFinished()) {
-            inventoryPositionManagerMap.put(each, new 
InventoryPositionManager<>(new FinishedPosition()));
+            inventoryPositionManagerMap.put(each, new 
InventoryPositionManager<>(new FinishedInventoryPosition()));
         }
     }
     
@@ -103,7 +103,7 @@ public abstract class AbstractResumeBreakPointManager 
implements ResumeBreakPoin
         JsonObject unfinished = new JsonObject();
         Set<String> finished = Sets.newHashSet();
         for (Entry<String, PositionManager<InventoryPosition>> entry : 
inventoryPositionManagerMap.entrySet()) {
-            if (entry.getValue().getPosition() instanceof FinishedPosition) {
+            if (entry.getValue().getPosition() instanceof 
FinishedInventoryPosition) {
                 finished.add(entry.getKey());
                 continue;
             }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index afb75fb..3823eaf 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import 
org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
@@ -85,7 +85,7 @@ public final class InventoryDataTaskSplitter {
         for (String each : dumperConfiguration.getTableNameMap().keySet()) {
             InventoryDumperConfiguration dumperConfig = new 
InventoryDumperConfiguration(dumperConfiguration);
             dumperConfig.setTableName(each);
-            dumperConfig.setPositionManager(new InventoryPositionManager<>(new 
PlaceholderPosition()));
+            dumperConfig.setPositionManager(new InventoryPositionManager<>(new 
PlaceholderInventoryPosition()));
             result.add(dumperConfig);
         }
         return result;
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
index 14ab58b..0c88238 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
@@ -75,7 +75,7 @@ public final class JdbcUri {
      * @return database name
      */
     public String getDatabase() {
-        return jdbcUri.getPath().replaceFirst("/", "");
+        return null == jdbcUri.getPath() ? "" : 
jdbcUri.getPath().replaceFirst("/", "");
     }
     
     /**
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
index c4db6c1..20a9558 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.utils;
 
 import com.google.gson.Gson;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 
 import java.util.List;
@@ -42,6 +42,6 @@ public final class InventoryPositionUtil {
         if (2 == values.size()) {
             return new PrimaryKeyPosition(values.get(0).longValue(), 
values.get(1).longValue());
         }
-        return new PlaceholderPosition();
+        return new PlaceholderInventoryPosition();
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
similarity index 67%
copy from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
copy to 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
index 212f168..fda741c 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
@@ -15,23 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.position;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+package org.apache.shardingsphere.scaling.core.utils;
 
 /**
- * Finished position.
+ * Thread util.
  */
-public final class FinishedPosition implements InventoryPosition {
-    
-    @Override
-    public JsonElement toJson() {
-        return new JsonObject();
-    }
+public final class ThreadUtil {
     
-    @Override
-    public int compareTo(final Position o) {
-        return 0;
+    /**
+     * Sleep ignored InterruptedException.
+     *
+     * @param millis sleep time.
+     */
+    public static void sleep(final long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException ignored) {
+        }
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
index a464d77..2153813 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
@@ -20,8 +20,11 @@ package 
org.apache.shardingsphere.scaling.core.execute.engine;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 public final class ShardingScalingExecuteEngineTest {
@@ -31,13 +34,41 @@ public final class ShardingScalingExecuteEngineTest {
         ShardingScalingExecuteEngine executeEngine = new 
ShardingScalingExecuteEngine(2);
         try {
             for (int i = 0; i < 5; i++) {
-                executeEngine.submit(mockShardingScalingExecutor());
+                Future<?> submit = 
executeEngine.submit(mockShardingScalingExecutor());
+                assertFalse(submit.isCancelled());
             }
         } catch (final RejectedExecutionException ex) {
             fail();
         }
     }
     
+    @Test
+    public void assertSubmitAllMoreThanMaxWorkerNumber() {
+        ShardingScalingExecuteEngine executeEngine = new 
ShardingScalingExecuteEngine(2);
+        try {
+            for (int i = 0; i < 5; i++) {
+                Future<?> submit = 
executeEngine.submitAll(Collections.singletonList(mockShardingScalingExecutor()),
 mockExecuteCallback());
+                assertFalse(submit.isCancelled());
+            }
+        } catch (final RejectedExecutionException ex) {
+            fail();
+        }
+    }
+    
+    private ExecuteCallback mockExecuteCallback() {
+        return new ExecuteCallback() {
+            @Override
+            public void onSuccess() {
+        
+            }
+    
+            @Override
+            public void onFailure(final Throwable throwable) {
+        
+            }
+        };
+    }
+    
     private ShardingScalingExecutor mockShardingScalingExecutor() {
         return new ShardingScalingExecutor() {
             @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannelTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannelTest.java
new file mode 100644
index 0000000..9efc85d
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannelTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.channel;
+
+import com.google.gson.JsonElement;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class DistributionChannelTest {
+    
+    private DistributionChannel distributionChannel;
+    
+    @Before
+    public void setUp() {
+        ScalingContext.getInstance().init(new ServerConfiguration());
+    }
+    
+    @Test
+    @SneakyThrows(InterruptedException.class)
+    public void assertAckCallbackResultSortable() {
+        distributionChannel = new DistributionChannel(2, records -> {
+            assertThat(records.size(), is(2));
+            assertTrue(((IntPosition) records.get(0).getPosition()).getId() < 
((IntPosition) records.get(1).getPosition()).getId());
+        });
+        distributionChannel.pushRecord(new PlaceholderRecord(new 
IntPosition(1)));
+        distributionChannel.pushRecord(new PlaceholderRecord(new 
IntPosition(2)));
+        fetchRecordsAndSleep(0);
+        fetchRecordsAndSleep(1);
+        invokeAckRecords0();
+    }
+    
+    private void fetchRecordsAndSleep(final int millis) {
+        new Thread(() -> {
+            distributionChannel.fetchRecords(1, 0);
+            if (millis > 0) {
+                ThreadUtil.sleep(millis);
+            }
+            distributionChannel.ack();
+        }).start();
+    }
+    
+    @Test
+    @SneakyThrows(InterruptedException.class)
+    public void assertBroadcastFinishedRecord() {
+        distributionChannel = new DistributionChannel(2, records -> 
assertThat(records.size(), is(2)));
+        distributionChannel.pushRecord(new FinishedRecord(new NopPosition()));
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void invokeAckRecords0() {
+        Method ackRecords0 = 
DistributionChannel.class.getDeclaredMethod("ackRecords0");
+        ackRecords0.setAccessible(true);
+        ackRecords0.invoke(distributionChannel);
+    }
+    
+    @After
+    public void tearDown() {
+        distributionChannel.close();
+    }
+    
+    @AllArgsConstructor
+    @Getter
+    private static class IntPosition implements Position {
+        
+        private final int id;
+        
+        @Override
+        public int compareTo(final Position position) {
+            return id - ((IntPosition) position).id;
+        }
+        
+        @Override
+        public JsonElement toJson() {
+            return null;
+        }
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index b856a2d..b1f32c5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -39,7 +39,7 @@ public final class FixtureH2ScalingEntry implements 
ScalingEntry {
     
     @Override
     public Class<? extends PositionManager<IncrementalPosition>> 
getPositionManager() {
-        return null;
+        return FixtureNopManager.class;
     }
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopManager.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopManager.java
new file mode 100644
index 0000000..e71e175
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.fixture;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+
+import javax.sql.DataSource;
+
+@RequiredArgsConstructor
+public final class FixtureNopManager extends 
BasePositionManager<IncrementalPosition> implements 
PositionManager<IncrementalPosition> {
+    
+    private DataSource dataSource;
+    
+    public FixtureNopManager(final String position) {
+    }
+    
+    @Override
+    public IncrementalPosition getPosition() {
+        
+        return new IncrementalPosition() {
+            @Override
+            public int compareTo(final Position o) {
+                return 0;
+            }
+            
+            @Override
+            public JsonElement toJson() {
+                return new JsonObject();
+            }
+        };
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderIncrementalPosition.java
similarity index 90%
copy from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
copy to 
shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderIncrementalPosition.java
index 212f168..03b5b66 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderIncrementalPosition.java
@@ -21,9 +21,9 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
 /**
- * Finished position.
+ * Placeholder inventory position.
  */
-public final class FinishedPosition implements InventoryPosition {
+public final class PlaceholderIncrementalPosition implements 
IncrementalPosition {
     
     @Override
     public JsonElement toJson() {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
similarity index 77%
rename from 
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
index 766e99d..ba9866f 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
@@ -18,15 +18,16 @@
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
 import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import 
org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.PlaceholderIncrementalPosition;
+import 
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
-import org.apache.shardingsphere.scaling.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -39,7 +40,7 @@ public final class AbstractResumeBreakPointManagerTest {
     
     private AbstractResumeBreakPointManager resumeBreakPointManager;
     
-    private final String incrementalPosition = 
"{\"ds0\":{\"filename\":\"mysql-bin.000001\",\"position\":4},\"ds1\":{\"filename\":\"mysql-bin.000002\",\"position\":4}}";
+    private final String incrementalPosition = "{\"ds0\":{},\"ds1\":{}}";
     
     private final String inventoryPosition = 
"{\"unfinished\":{\"ds0.t_order_1#0\":[0,100],\"ds0.t_order_2\":[],\"ds1.t_order_1#0\":[0,200]},\"finished\":[\"ds0.t_order_1#1\"]}";
     
@@ -47,8 +48,8 @@ public final class AbstractResumeBreakPointManagerTest {
     public void setUp() throws NoSuchFieldException, IllegalAccessException {
         resumeBreakPointManager = new AbstractResumeBreakPointManager() {
         };
-        resumeBreakPointManager.setDatabaseType("MySQL");
-        resumeBreakPointManager.setTaskPath("/scalingTest/item-0");
+        resumeBreakPointManager.setDatabaseType("H2");
+        resumeBreakPointManager.setTaskPath("/");
         
ReflectionUtil.getFieldFromClass(AbstractResumeBreakPointManager.class, 
"inventoryPositionManagerMap", true)
                 .set(resumeBreakPointManager, new TreeMap<String, 
PositionManager<InventoryPosition>>());
         
ReflectionUtil.getFieldFromClass(AbstractResumeBreakPointManager.class, 
"incrementalPositionManagerMap", true)
@@ -57,20 +58,24 @@ public final class AbstractResumeBreakPointManagerTest {
     
     @Test
     public void assertResumeIncrementalPosition() {
-        resumeBreakPointManager.resumeIncrementalPosition(incrementalPosition);
-        
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), 
is(2));
+        resumeBreakPointManager.resumeInventoryPosition("");
+        
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), 
is(0));
+        resumeBreakPointManager.resumeInventoryPosition(inventoryPosition);
+        
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), 
is(4));
     }
     
     @Test
     public void assertResumeInventoryPosition() {
-        resumeBreakPointManager.resumeInventoryPosition(inventoryPosition);
-        
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), 
is(4));
+        resumeBreakPointManager.resumeIncrementalPosition("");
+        
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), 
is(0));
+        resumeBreakPointManager.resumeIncrementalPosition(incrementalPosition);
+        
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), 
is(2));
     }
     
     @Test
     public void assertGetIncrementalPositionData() {
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", 
new BasePositionManager<>(new BinlogPosition("mysql-bin.000001", 4L)));
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", 
new BasePositionManager<>(new BinlogPosition("mysql-bin.000002", 4L)));
+        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", 
new BasePositionManager<>(new PlaceholderIncrementalPosition()));
+        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", 
new BasePositionManager<>(new PlaceholderIncrementalPosition()));
         assertThat(resumeBreakPointManager.getIncrementalPositionData(), 
is(incrementalPosition));
     }
     
@@ -82,22 +87,39 @@ public final class AbstractResumeBreakPointManagerTest {
     
     @Test
     public void assertPlaceholderPositionJson() {
-        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", 
new InventoryPositionManager<>(new PlaceholderPosition()));
+        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", 
new InventoryPositionManager<>(new PlaceholderInventoryPosition()));
         assertThat(resumeBreakPointManager.getInventoryPositionData(), 
is("{\"unfinished\":{\"ds0.t_order_1#0\":[]},\"finished\":[]}"));
+        assertThat(new PlaceholderInventoryPosition().toJson().toString(), 
is("[]"));
     }
     
     @Test
     public void assertFinishedPositionJson() {
-        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", 
new InventoryPositionManager<>(new FinishedPosition()));
+        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", 
new InventoryPositionManager<>(new FinishedInventoryPosition()));
         assertThat(resumeBreakPointManager.getInventoryPositionData(), 
is("{\"unfinished\":{},\"finished\":[\"ds0.t_order_1#0\"]}"));
+        assertThat(new FinishedInventoryPosition().toJson().toString(), 
is("{}"));
     }
     
     @Test
     public void assertGetInventoryPositionData() {
         
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", 
new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 100L)));
-        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", 
new InventoryPositionManager<>(new FinishedPosition()));
-        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", 
new InventoryPositionManager<>(new PlaceholderPosition()));
+        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", 
new InventoryPositionManager<>(new FinishedInventoryPosition()));
+        
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", 
new InventoryPositionManager<>(new PlaceholderInventoryPosition()));
         
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", 
new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 200L)));
         assertThat(resumeBreakPointManager.getInventoryPositionData(), 
is(inventoryPosition));
     }
+    
+    @Test
+    public void assertGetDatabaseType() {
+        assertThat(resumeBreakPointManager.getDatabaseType(), is("H2"));
+    }
+    
+    @Test
+    public void assertGetTaskPath() {
+        assertThat(resumeBreakPointManager.getTaskPath(), is("/"));
+    }
+    
+    @After
+    public void tearDown() {
+        resumeBreakPointManager.close();
+    }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
index 5dda1d3..baba817 100755
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
@@ -42,6 +42,18 @@ public final class MySQLJdbcDumper extends 
AbstractJDBCDumper {
         
jdbcDataSourceConfiguration.setJdbcUrl(fixMySQLUrl(jdbcDataSourceConfiguration.getJdbcUrl()));
     }
     
+    private String fixMySQLUrl(final String url) {
+        JdbcUri uri = new JdbcUri(url);
+        return String.format("jdbc:%s://%s/%s?%s", uri.getScheme(), 
uri.getHost(), uri.getDatabase(), fixMySQLParams(uri.getParameters()));
+    }
+    
+    private String fixMySQLParams(final Map<String, String> parameters) {
+        if (!parameters.containsKey("yearIsDateType")) {
+            parameters.put("yearIsDateType", "false");
+        }
+        return formatMySQLParams(parameters);
+    }
+    
     private String formatMySQLParams(final Map<String, String> params) {
         StringBuilder result = new StringBuilder();
         for (Entry<String, String> entry : params.entrySet()) {
@@ -55,18 +67,6 @@ public final class MySQLJdbcDumper extends 
AbstractJDBCDumper {
         return result.toString();
     }
     
-    private String fixMySQLUrl(final String url) {
-        JdbcUri uri = new JdbcUri(url);
-        return String.format("jdbc:%s://%s/%s?%s", uri.getScheme(), 
uri.getHost(), uri.getDatabase(), fixMySQLParams(uri.getParameters()));
-    }
-    
-    private String fixMySQLParams(final Map<String, String> parameters) {
-        if (!parameters.containsKey("yearIsDateType")) {
-            parameters.put("yearIsDateType", "false");
-        }
-        return formatMySQLParams(parameters);
-    }
-    
     @Override
     public Object readValue(final ResultSet resultSet, final int index) throws 
SQLException {
         if (isDateTimeValue(resultSet.getMetaData().getColumnType(index))) {
@@ -83,7 +83,7 @@ public final class MySQLJdbcDumper extends AbstractJDBCDumper 
{
     @Override
     protected PreparedStatement createPreparedStatement(final Connection conn, 
final String sql) throws SQLException {
         PreparedStatement result = conn.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-        result.setFetchSize(Integer.MIN_VALUE);
+        result.setFetchSize(100);
         return result;
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
index b4a118c..5123396 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
@@ -37,5 +37,12 @@ public final class BinlogPositionTest {
         assertThat(binlogPosition.compareTo(new BinlogPosition(fileName, 10)), 
is(0));
         assertThat(binlogPosition.compareTo(new BinlogPosition(fileName, 9)), 
is(1));
         assertThat(binlogPosition.compareTo(new BinlogPosition(fileName, 11)), 
is(-1));
+        assertThat(binlogPosition.compareTo(null), is(1));
+    }
+    
+    @Test
+    public void assertToJson() {
+        BinlogPosition binlogPosition = new BinlogPosition("mysql-bin.000001", 
4);
+        assertThat(binlogPosition.toJson().toString(), 
is("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumperTest.java
similarity index 67%
rename from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumperTest.java
index 212f168..47570e4 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumperTest.java
@@ -15,23 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.position;
+package org.apache.shardingsphere.scaling.mysql;
 
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+import org.junit.Test;
 
-/**
- * Finished position.
- */
-public final class FinishedPosition implements InventoryPosition {
+public final class MySQLBinlogDumperTest {
     
-    @Override
-    public JsonElement toJson() {
-        return new JsonObject();
-    }
+    @Test
+    public void assertStart() {
     
-    @Override
-    public int compareTo(final Position o) {
-        return 0;
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
index ddd4d01..b417dfe 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
@@ -82,9 +82,9 @@ public final class MySQLDataSourceCheckerTest {
         dataSourceChecker.checkPrivilege(dataSources);
         verify(preparedStatement, Mockito.times(1)).executeQuery();
     }
-
+    
     @Test
-    public void assertCheckPrivilegeFailure() throws SQLException {
+    public void assertCheckPrivilegeLackPrivileges() throws SQLException {
         when(resultSet.next()).thenReturn(false);
         try {
             dataSourceChecker.checkPrivilege(dataSources);
@@ -94,6 +94,16 @@ public final class MySQLDataSourceCheckerTest {
     }
     
     @Test
+    public void assertCheckPrivilegeFailure() throws SQLException {
+        when(resultSet.next()).thenThrow(new SQLException());
+        try {
+            dataSourceChecker.checkPrivilege(dataSources);
+        } catch (final PrepareFailedException ex) {
+            assertThat(ex.getMessage(), is("Source datasource check privileges 
failed."));
+        }
+    }
+    
+    @Test
     public void assertCheckVariableSuccess() throws SQLException {
         when(resultSet.next()).thenReturn(true, true);
         when(resultSet.getString(2)).thenReturn("ON", "ROW");
@@ -102,14 +112,23 @@ public final class MySQLDataSourceCheckerTest {
     }
     
     @Test
-    public void assertCheckVariableFailure() throws SQLException {
+    public void assertCheckVariableWithWrongVariable() throws SQLException {
         when(resultSet.next()).thenReturn(true, true);
         when(resultSet.getString(2)).thenReturn("OFF", "ROW");
         try {
             dataSourceChecker.checkVariable(dataSources);
-        } catch (final PrepareFailedException checkFailedEx) {
-            assertThat(checkFailedEx.getMessage(), is("Source datasource 
required LOG_BIN = ON, now is OFF"));
+        } catch (final PrepareFailedException ex) {
+            assertThat(ex.getMessage(), is("Source datasource required LOG_BIN 
= ON, now is OFF"));
         }
     }
     
+    @Test
+    public void assertCheckVariableFailure() throws SQLException {
+        when(resultSet.next()).thenThrow(new SQLException());
+        try {
+            dataSourceChecker.checkVariable(dataSources);
+        } catch (final PrepareFailedException ex) {
+            assertThat(ex.getMessage(), is("Source datasource check variables 
failed."));
+        }
+    }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
new file mode 100644
index 0000000..b048129
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.mysql;
+
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MySQLImporterTest {
+    
+    @Mock
+    private ImporterConfiguration importerConfiguration;
+    
+    @Mock
+    private DataSourceManager dataSourceManager;
+    
+    @Test
+    public void assertCreateSqlBuilder() {
+        MySQLImporter mySQLImporter = new MySQLImporter(importerConfiguration, 
dataSourceManager);
+        String insertSQL = 
mySQLImporter.createSqlBuilder().buildInsertSQL(mockDataRecord());
+        assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`) 
VALUES(?,?)"));
+    }
+    
+    private DataRecord mockDataRecord() {
+        DataRecord result = new DataRecord(new BinlogPosition("binlog-000001", 
4), 2);
+        result.setTableName("t_order");
+        result.addColumn(new Column("id", 1, true, true));
+        result.addColumn(new Column("name", "", true, false));
+        return result;
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
new file mode 100644
index 0000000..506170e
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.mysql;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import 
org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import 
org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class MySQLJdbcDumperTest {
+    
+    private DataSourceManager dataSourceManager;
+    
+    private MySQLJdbcDumper mySQLJdbcDumper;
+    
+    @Before
+    public void setUp() {
+        dataSourceManager = new DataSourceManager();
+        mySQLJdbcDumper = new 
MySQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
+    }
+    
+    private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
+        DumperConfiguration dumperConfiguration = mockDumperConfiguration();
+        initTableData(dumperConfiguration);
+        InventoryDumperConfiguration result = new 
InventoryDumperConfiguration(dumperConfiguration);
+        result.setTableName("t_order");
+        return result;
+    }
+    
+    private DumperConfiguration mockDumperConfiguration() {
+        DumperConfiguration dumperConfiguration = new DumperConfiguration();
+        dumperConfiguration.setDataSourceConfiguration(
+                new 
JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
+        return dumperConfiguration;
+    }
+    
+    @SneakyThrows(SQLException.class)
+    private void initTableData(final DumperConfiguration dumperConfig) {
+        DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
+        try (Connection connection = dataSource.getConnection();
+             Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS t_order");
+            statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, 
user_id VARCHAR(12))");
+            statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 
'xxx'), (999, 'yyy')");
+        }
+    }
+    
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertReadValue() {
+        ResultSet resultSet = mock(ResultSet.class);
+        ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
+        when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        when(resultSetMetaData.getColumnType(1)).thenReturn(Types.TIMESTAMP);
+        when(resultSetMetaData.getColumnType(2)).thenReturn(Types.VARCHAR);
+        mySQLJdbcDumper.readValue(resultSet, 1);
+        mySQLJdbcDumper.readValue(resultSet, 2);
+        verify(resultSet).getString(1);
+        verify(resultSet).getObject(2);
+    }
+    
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertCreatePreparedStatement() {
+        DataSource dataSource = 
dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfiguration());
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
mySQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
+            assertThat(preparedStatement.getFetchSize(), is(100));
+        }
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
index 9ce18cc..d0bfcca 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +52,8 @@ public final class MySQLPositionManagerTest {
     private Connection connection;
     
     @Before
-    public void setUp() throws Exception {
+    @SneakyThrows(SQLException.class)
+    public void setUp() {
         when(dataSource.getConnection()).thenReturn(connection);
         PreparedStatement positionStatement = mockPositionStatement();
         when(connection.prepareStatement("SHOW MASTER 
STATUS")).thenReturn(positionStatement);
@@ -69,6 +71,14 @@ public final class MySQLPositionManagerTest {
     }
     
     @Test
+    public void assertInitPositionByJson() {
+        MySQLPositionManager mysqlPositionManager = new 
MySQLPositionManager(new BinlogPosition(LOG_FILE_NAME, 
LOG_POSITION).toJson().toString());
+        BinlogPosition actual = mysqlPositionManager.getPosition();
+        assertThat(actual.getFilename(), is(LOG_FILE_NAME));
+        assertThat(actual.getPosition(), is(LOG_POSITION));
+    }
+    
+    @Test
     public void assertUpdateCurrentPosition() {
         MySQLPositionManager mysqlPositionManager = new 
MySQLPositionManager(dataSource);
         BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, 
LOG_POSITION, SERVER_ID);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
index 3d9d525..d5023bf 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
@@ -47,7 +47,7 @@ public final class PostgreSQLPositionManager extends 
BasePositionManager<WalPosi
     }
     
     public PostgreSQLPositionManager(final String position) {
-        setPosition(new WalPosition(LogSequenceNumber.valueOf(position)));
+        setPosition(new 
WalPosition(LogSequenceNumber.valueOf(Long.parseLong(position))));
     }
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 09840e1..8965bc5 100755
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
 import 
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
 import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
@@ -80,7 +81,7 @@ public final class PostgreSQLWalDumper extends 
AbstractShardingScalingExecutor<W
             while (isRunning()) {
                 ByteBuffer msg = stream.readPending();
                 if (msg == null) {
-                    sleep();
+                    ThreadUtil.sleep(10L);
                     continue;
                 }
                 AbstractWalEvent event = decodingPlugin.decode(msg, 
stream.getLastReceiveLSN());
@@ -91,13 +92,6 @@ public final class PostgreSQLWalDumper extends 
AbstractShardingScalingExecutor<W
         }
     }
     
-    private void sleep() {
-        try {
-            Thread.sleep(10L);
-        } catch (final InterruptedException ignored) {
-        }
-    }
-    
     private void pushRecord(final Record record) {
         try {
             channel.pushRecord(record);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
index f74c487..111b1c1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,7 +55,8 @@ public final class PostgreSQLPositionManagerTest {
     private DatabaseMetaData databaseMetaData;
     
     @Before
-    public void setUp() throws Exception {
+    @SneakyThrows(SQLException.class)
+    public void setUp() {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.getMetaData()).thenReturn(databaseMetaData);
         PreparedStatement postgreSQL96LsnPs = mockPostgreSQL96Lsn();
@@ -66,7 +68,15 @@ public final class PostgreSQLPositionManagerTest {
     }
     
     @Test
-    public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
+    public void assertInitPositionByJson() {
+        PostgreSQLPositionManager postgreSQLPositionManager = new 
PostgreSQLPositionManager("100");
+        WalPosition actual = postgreSQLPositionManager.getPosition();
+        assertThat(actual.getLogSequenceNumber().asLong(), 
is(LogSequenceNumber.valueOf(100L).asLong()));
+    }
+    
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertGetCurrentPositionOnPostgreSQL96() {
         PostgreSQLPositionManager postgreSQLPositionManager = new 
PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
@@ -75,7 +85,8 @@ public final class PostgreSQLPositionManagerTest {
     }
     
     @Test
-    public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
+    @SneakyThrows(SQLException.class)
+    public void assertGetCurrentPositionOnPostgreSQL10() {
         PostgreSQLPositionManager postgreSQLPositionManager = new 
PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
         WalPosition actual = postgreSQLPositionManager.getPosition();
@@ -83,7 +94,8 @@ public final class PostgreSQLPositionManagerTest {
     }
     
     @Test(expected = RuntimeException.class)
-    public void assertGetCurrentPositionThrowException() throws SQLException {
+    @SneakyThrows(SQLException.class)
+    public void assertGetCurrentPositionThrowException() {
         PostgreSQLPositionManager postgreSQLPositionManager = new 
PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
@@ -98,7 +110,8 @@ public final class PostgreSQLPositionManagerTest {
         assertThat(postgreSQLPositionManager.getPosition(), is(expected));
     }
     
-    private PreparedStatement mockPostgreSQL96Lsn() throws SQLException {
+    @SneakyThrows(SQLException.class)
+    private PreparedStatement mockPostgreSQL96Lsn() {
         PreparedStatement result = mock(PreparedStatement.class);
         ResultSet resultSet = mock(ResultSet.class);
         when(result.executeQuery()).thenReturn(resultSet);
@@ -107,7 +120,8 @@ public final class PostgreSQLPositionManagerTest {
         return result;
     }
     
-    private PreparedStatement mockPostgreSQL10Lsn() throws SQLException {
+    @SneakyThrows(SQLException.class)
+    private PreparedStatement mockPostgreSQL10Lsn() {
         PreparedStatement result = mock(PreparedStatement.class);
         ResultSet resultSet = mock(ResultSet.class);
         when(result.executeQuery()).thenReturn(resultSet);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
new file mode 100644
index 0000000..c13ace9
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql;
+
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import 
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.junit.Test;
+import org.postgresql.replication.LogSequenceNumber;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PostgreSQLSqlBuilderTest {
+    
+    @Test
+    public void assertBuildInsertSQL() {
+        String actual = new 
PostgreSQLSqlBuilder().buildInsertSQL(mockDataRecord());
+        assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") 
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+    }
+    
+    private DataRecord mockDataRecord() {
+        DataRecord result = new DataRecord(new 
WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+        result.setTableName("t_order");
+        result.addColumn(new Column("id", 1, true, true));
+        result.addColumn(new Column("name", "", true, false));
+        return result;
+    }
+}

Reply via email to