This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 11a9fa30f30 Refactor PrimaryKeyPosition (#27191)
11a9fa30f30 is described below
commit 11a9fa30f30710269c999cc00c049baa281c66be
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Jul 14 12:38:49 2023 +0800
Refactor PrimaryKeyPosition (#27191)
---
.../position/{ => pk}/PrimaryKeyPosition.java | 29 ++++++++++++++--------
.../{ => pk}/PrimaryKeyPositionFactory.java | 5 +++-
.../{ => pk/type}/IntegerPrimaryKeyPosition.java | 15 +++++++----
.../{ => pk/type}/StringPrimaryKeyPosition.java | 15 +++++++----
.../{ => pk/type}/UnsupportedKeyPosition.java | 15 +++++++----
.../YamlJobItemInventoryTasksProgressSwapper.java | 2 +-
.../data/pipeline/core/dumper/InventoryDumper.java | 4 +--
.../core/preparer/InventoryTaskSplitter.java | 6 ++---
.../type}/IntegerPrimaryKeyPositionTest.java | 3 ++-
.../type}/StringPrimaryKeyPositionTest.java | 3 ++-
.../{ => pk/type}/UnsupportedKeyPositionTest.java | 3 ++-
.../InventoryIncrementalJobItemProgressTest.java | 2 +-
.../cdc/util/DataRecordResultConvertUtilsTest.java | 2 +-
.../core/prepare/InventoryTaskSplitterTest.java | 2 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 2 +-
15 files changed, 68 insertions(+), 40 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/PrimaryKeyPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/PrimaryKeyPosition.java
similarity index 72%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/PrimaryKeyPosition.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/PrimaryKeyPosition.java
index c27d779fa11..5c89038387e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/PrimaryKeyPosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/PrimaryKeyPosition.java
@@ -15,35 +15,42 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk;
+
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
/**
* Primary key position.
*
* @param <T> type of value
*/
-public abstract class PrimaryKeyPosition<T> {
+public interface PrimaryKeyPosition<T> extends IngestPosition {
/**
* Get begin value.
*
* @return begin value
*/
- public abstract T getBeginValue();
+ T getBeginValue();
/**
* Get end value.
*
* @return end value
*/
- public abstract T getEndValue();
-
- protected abstract T convert(String value);
+ T getEndValue();
- protected abstract char getType();
+ /**
+ * Convert value.
+ * @param value value to be converted
+ * @return converted value
+ */
+ T convert(String value);
- @Override
- public final String toString() {
- return String.format("%s,%s,%s", getType(), null != getBeginValue() ?
getBeginValue() : "", null != getEndValue() ? getEndValue() : "");
- }
+ /**
+ * Get type.
+ *
+ * @return type
+ */
+ char getType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/PrimaryKeyPositionFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/PrimaryKeyPositionFactory.java
similarity index 90%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/PrimaryKeyPositionFactory.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/PrimaryKeyPositionFactory.java
index 07b75fa3d3c..20b454a825b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/PrimaryKeyPositionFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/PrimaryKeyPositionFactory.java
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
import java.util.List;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/IntegerPrimaryKeyPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/IntegerPrimaryKeyPosition.java
similarity index 78%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/IntegerPrimaryKeyPosition.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/IntegerPrimaryKeyPosition.java
index 32c3b0f8f07..b2f97dc93b0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/IntegerPrimaryKeyPosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/IntegerPrimaryKeyPosition.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
/**
* Integer primary key position.
*/
-public final class IntegerPrimaryKeyPosition extends PrimaryKeyPosition<Long>
implements IngestPosition {
+public final class IntegerPrimaryKeyPosition implements
PrimaryKeyPosition<Long> {
private final long beginValue;
@@ -44,12 +44,17 @@ public final class IntegerPrimaryKeyPosition extends
PrimaryKeyPosition<Long> im
}
@Override
- protected Long convert(final String value) {
+ public Long convert(final String value) {
return Long.parseLong(value);
}
@Override
- protected char getType() {
+ public char getType() {
return 'i';
}
+
+ @Override
+ public String toString() {
+ return String.format("%s,%s,%s", getType(), beginValue, endValue);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/StringPrimaryKeyPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/StringPrimaryKeyPosition.java
similarity index 72%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/StringPrimaryKeyPosition.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/StringPrimaryKeyPosition.java
index defe46e2f65..33e906b3f85 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/StringPrimaryKeyPosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/StringPrimaryKeyPosition.java
@@ -15,30 +15,35 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
/**
* String primary key position.
*/
@RequiredArgsConstructor
@Getter
-public final class StringPrimaryKeyPosition extends PrimaryKeyPosition<String>
implements IngestPosition {
+public final class StringPrimaryKeyPosition implements
PrimaryKeyPosition<String> {
private final String beginValue;
private final String endValue;
@Override
- protected String convert(final String value) {
+ public String convert(final String value) {
return value;
}
@Override
- protected char getType() {
+ public char getType() {
return 's';
}
+
+ @Override
+ public String toString() {
+ return String.format("%s,%s,%s", getType(), null == beginValue ? "" :
beginValue, null == endValue ? "" : endValue);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/UnsupportedKeyPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/UnsupportedKeyPosition.java
similarity index 76%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/UnsupportedKeyPosition.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/UnsupportedKeyPosition.java
index 022e094ee9f..8bd5da021fc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/UnsupportedKeyPosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/UnsupportedKeyPosition.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
/**
* Unsupported key position.
*/
-public final class UnsupportedKeyPosition extends PrimaryKeyPosition<Void>
implements IngestPosition {
+public final class UnsupportedKeyPosition implements PrimaryKeyPosition<Void> {
@Override
public Void getBeginValue() {
@@ -35,12 +35,17 @@ public final class UnsupportedKeyPosition extends
PrimaryKeyPosition<Void> imple
}
@Override
- protected Void convert(final String value) {
+ public Void convert(final String value) {
throw new UnsupportedOperationException();
}
@Override
- protected char getType() {
+ public char getType() {
return 'u';
}
+
+ @Override
+ public String toString() {
+ return String.format("%s,,", getType());
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
index 899cf6249af..c2685d75f09 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import com.google.common.base.Strings;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemInventoryTasksProgress;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 6717c6d8a44..52e80609743 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -38,8 +38,8 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableM
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PrimaryKeyPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 81de591f8f7..9461e8bfd80 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -31,10 +31,10 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineRea
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.StringPrimaryKeyPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.UnsupportedKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineSQLBuilderEngine;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/IntegerPrimaryKeyPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/IntegerPrimaryKeyPositionTest.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/IntegerPrimaryKeyPositionTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/IntegerPrimaryKeyPositionTest.java
index 7b586f39eea..96eef556484 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/IntegerPrimaryKeyPositionTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/IntegerPrimaryKeyPositionTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/StringPrimaryKeyPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/StringPrimaryKeyPositionTest.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/StringPrimaryKeyPositionTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/StringPrimaryKeyPositionTest.java
index bcdd87b9261..d1e972c5cb6 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/StringPrimaryKeyPositionTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/StringPrimaryKeyPositionTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/UnsupportedKeyPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/UnsupportedKeyPositionTest.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/UnsupportedKeyPositionTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/UnsupportedKeyPositionTest.java
index 058091e3193..eb812a1c2e8 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/UnsupportedKeyPositionTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/position/pk/type/UnsupportedKeyPositionTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.ingest.position;
+package org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
index a154af9dc7f..823b7aa50d6 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 003f7c2dd68..bf8cbf4a133 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 8f3109c1d25..5212fcfa6d2 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index cfe5b794ce0..54fa3ddb6c6 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;