This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b53638656c8 [FLINK-39458][table] Move type converters and serializers
into new flink-table-type-utils module
b53638656c8 is described below
commit b53638656c875e5655cc9b960341ff2a3b869855
Author: Mika Naylor <[email protected]>
AuthorDate: Fri Apr 24 09:29:07 2026 +0200
[FLINK-39458][table] Move type converters and serializers into new
flink-table-type-utils module
This closes #27980.
---
.../flink/core}/memory/AbstractPagedInputView.java | 5 +-
.../core}/memory/AbstractPagedOutputView.java | 7 +-
.../core}/memory/ListMemorySegmentSource.java | 5 +-
.../flink/core/memory}/RandomAccessInputView.java | 5 +-
.../flink/core/memory}/RandomAccessOutputView.java | 5 +-
.../runtime/io/disk/FileChannelInputView.java | 2 +-
.../runtime/io/disk/FileChannelOutputView.java | 2 +-
.../io/disk/SeekableFileChannelInputView.java | 2 +-
.../io/disk/SimpleCollectingOutputView.java | 2 +-
.../flink/runtime/io/disk/SpillingBuffer.java | 3 +-
.../iomanager/AbstractChannelReaderInputView.java | 2 +-
.../iomanager/AbstractChannelWriterOutputView.java | 2 +-
.../io/disk/iomanager/ChannelReaderInputView.java | 2 +-
.../io/disk/iomanager/ChannelWriterOutputView.java | 2 +-
.../iterative/io/SerializedUpdateBuffer.java | 4 +-
.../flink/runtime/operators/TempBarrier.java | 2 +-
.../operators/hash/CompactingHashTable.java | 2 +-
.../runtime/operators/hash/HashPartition.java | 6 +-
.../runtime/operators/hash/InMemoryPartition.java | 6 +-
.../operators/hash/InPlaceMutableHashTable.java | 4 +-
.../AbstractBlockResettableIterator.java | 4 +-
.../resettable/SpillingResettableIterator.java | 2 +-
.../SpillingResettableMutableObjectIterator.java | 2 +-
.../operators/sort/FixedLengthRecordSorter.java | 4 +-
.../operators/sort/NormalizedKeySorter.java | 4 +-
.../flink/runtime/io/disk/SpillingBufferTest.java | 2 +-
.../network/api/serialization/PagedViewsTest.java | 4 +-
flink-table/flink-table-planner/pom.xml | 8 +
flink-table/flink-table-runtime/pom.xml | 16 ++
.../runtime/hashtable/BinaryHashBucketArea.java | 2 +-
.../runtime/hashtable/BinaryHashPartition.java | 6 +-
.../table/runtime/hashtable/LongHashPartition.java | 4 +-
.../sort/AbstractBinaryExternalMerger.java | 2 +-
.../operators/sort/BinaryExternalMerger.java | 2 +-
.../operators/sort/BinaryIndexedSortable.java | 4 +-
.../operators/sort/BinaryKVExternalMerger.java | 2 +-
.../operators/sort/BinaryKVInMemorySortBuffer.java | 4 +-
.../runtime/typeutils/WindowKeySerializer.java | 4 +-
.../runtime/util/ResettableExternalBuffer.java | 2 +-
.../collections/binary/AbstractBytesHashMap.java | 4 +-
.../collections/binary/AbstractBytesMultiMap.java | 6 +-
.../apache/flink/table/data/BinaryRowDataTest.java | 4 +-
.../collections/binary/BytesHashMapTestBase.java | 2 +-
flink-table/flink-table-type-utils/pom.xml | 110 ++++++++++
.../conversion/ArrayBooleanArrayConverter.java | 0
.../data/conversion/ArrayByteArrayConverter.java | 0
.../data/conversion/ArrayDoubleArrayConverter.java | 0
.../data/conversion/ArrayFloatArrayConverter.java | 0
.../data/conversion/ArrayIntArrayConverter.java | 0
.../table/data/conversion/ArrayListConverter.java | 0
.../data/conversion/ArrayLongArrayConverter.java | 0
.../data/conversion/ArrayObjectArrayConverter.java | 0
.../data/conversion/ArrayShortArrayConverter.java | 0
.../data/conversion/BitmapBitmapConverter.java | 0
.../data/conversion/DataStructureConverter.java | 0
.../data/conversion/DataStructureConverters.java | 0
.../table/data/conversion/DateDateConverter.java | 0
.../data/conversion/DateLocalDateConverter.java | 0
.../DayTimeIntervalDurationConverter.java | 0
.../conversion/DecimalBigDecimalConverter.java | 0
.../table/data/conversion/IdentityConverter.java | 0
.../LocalZonedTimestampInstantConverter.java | 0
.../LocalZonedTimestampIntConverter.java | 0
.../LocalZonedTimestampLongConverter.java | 0
.../LocalZonedTimestampTimestampConverter.java | 0
.../table/data/conversion/MapMapConverter.java | 0
.../data/conversion/RawByteArrayConverter.java | 0
.../table/data/conversion/RawObjectConverter.java | 0
.../table/data/conversion/RowRowConverter.java | 0
.../data/conversion/StringByteArrayConverter.java | 0
.../data/conversion/StringStringConverter.java | 0
.../data/conversion/StructuredObjectConverter.java | 0
.../data/conversion/TimeLocalTimeConverter.java | 0
.../table/data/conversion/TimeLongConverter.java | 0
.../table/data/conversion/TimeTimeConverter.java | 0
.../TimestampLocalDateTimeConverter.java | 0
.../conversion/TimestampTimestampConverter.java | 0
.../YearMonthIntervalPeriodConverter.java | 0
.../apache/flink/table/data/util/MapDataUtil.java | 0
.../table/data/writer/AbstractBinaryWriter.java | 0
.../flink/table/data/writer/BinaryArrayWriter.java | 0
.../flink/table/data/writer/BinaryRowWriter.java | 0
.../flink/table/data/writer/BinaryWriter.java | 0
.../table/runtime/generated/CompileUtils.java | 240 +++++++++++++++++++++
.../runtime/typeutils/AbstractMapSerializer.java | 0
.../typeutils/AbstractRowDataSerializer.java | 0
.../runtime/typeutils/ArrayDataSerializer.java | 0
.../runtime/typeutils/BinaryRowDataSerializer.java | 4 +-
.../runtime/typeutils/DecimalDataSerializer.java | 0
.../runtime/typeutils/DecimalDataTypeInfo.java | 0
.../runtime/typeutils/ExternalSerializer.java | 0
.../table/runtime/typeutils/ExternalTypeInfo.java | 0
.../runtime/typeutils/InternalSerializers.java | 0
.../table/runtime/typeutils/InternalTypeInfo.java | 0
.../table/runtime/typeutils/MapDataSerializer.java | 0
.../runtime/typeutils/PagedTypeSerializer.java | 4 +-
.../runtime/typeutils/RawValueDataSerializer.java | 0
.../table/runtime/typeutils/RowDataSerializer.java | 4 +-
.../runtime/typeutils/StringDataSerializer.java | 0
.../runtime/typeutils/TimestampDataSerializer.java | 0
.../table/data/DataStructureConvertersTest.java | 0
.../runtime/typeutils/ArrayDataSerializerTest.java | 0
.../runtime/typeutils/BinaryRowSerializerTest.java | 0
.../runtime/typeutils/DecimalSerializerTest.java | 0
.../runtime/typeutils/ExternalSerializerTest.java | 0
.../runtime/typeutils/MapDataSerializerTest.java | 0
.../typeutils/RawValueDataSerializerTest.java | 0
.../runtime/typeutils/RowDataSerializerTest.java | 0
.../typeutils/StringDataSerializerTest.java | 0
.../typeutils/TimestampDataSerializerTest.java | 0
.../flink/table/utils/RawValueDataAsserter.java | 0
flink-table/pom.xml | 1 +
112 files changed, 443 insertions(+), 84 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedInputView.java
similarity index 99%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
rename to
flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedInputView.java
index b9b23631f67..eb06a16a2c0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedInputView.java
@@ -16,10 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.memory;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
+package org.apache.flink.core.memory;
import java.io.EOFException;
import java.io.IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedOutputView.java
similarity index 98%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
rename to
flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedOutputView.java
index 8d8fdf9c79d..9709ca6d5e5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedOutputView.java
@@ -16,12 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.memory;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentWritable;
+package org.apache.flink.core.memory;
import java.io.IOException;
import java.io.UTFDataFormatException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
b/flink-core/src/main/java/org/apache/flink/core/memory/ListMemorySegmentSource.java
similarity index 90%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
rename to
flink-core/src/main/java/org/apache/flink/core/memory/ListMemorySegmentSource.java
index d21d141316c..502fe1b901b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/ListMemorySegmentSource.java
@@ -16,10 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.memory;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentSource;
+package org.apache.flink.core.memory;
import java.util.List;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessInputView.java
similarity index 93%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
rename to
flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessInputView.java
index a12cf7704eb..bead6fee2bd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessInputView.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.io.disk;
+package org.apache.flink.core.memory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.SeekableDataInputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.util.MathUtils;
import java.io.EOFException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessOutputView.java
similarity index 91%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
rename to
flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessOutputView.java
index 3ce326e6892..7e348da0cdf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessOutputView.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.io.disk;
+package org.apache.flink.core.memory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.SeekableDataOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.util.MathUtils;
import java.io.EOFException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
index f963235b707..67da217fd3d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.io.disk;
+import org.apache.flink.core.memory.AbstractPagedInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.MathUtils;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
index f415867c281..88564c4b0e5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.io.disk;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.runtime.memory.MemoryManager;
import java.io.IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index 1613765b343..d9d1ae128c8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -18,11 +18,11 @@
package org.apache.flink.runtime.io.disk;
+import org.apache.flink.core.memory.AbstractPagedInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.MathUtils;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
index 514d1343812..bb1b437d6bf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.io.disk;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.util.MathUtils;
import java.io.EOFException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
index 1c576346810..8838b4bcfed 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
@@ -18,14 +18,15 @@
package org.apache.flink.runtime.io.disk;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import java.io.IOException;
import java.util.ArrayList;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
index e9f6cdff775..2ceb54252a4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.core.memory.AbstractPagedInputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
import java.io.IOException;
import java.util.List;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
index 9e8f34d2c91..fd6be9d1a8e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.disk.iomanager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import java.io.IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index 0d7252c3097..d8a8edb1f54 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.core.memory.AbstractPagedInputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
import java.io.EOFException;
import java.io.IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index 465bd6d9c18..3be7bca27ca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import java.io.IOException;
import java.util.ArrayList;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index f6abefd7299..a731d1e0cd5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.iterative.io;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import java.io.EOFException;
import java.io.IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
index c248052169e..ce2435b5825 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
@@ -21,12 +21,12 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 7a999e05ab2..a13291107c7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.util.IntArrayList;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.util.MathUtils;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index f43da4eba24..32c605e51f4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -20,17 +20,17 @@ package org.apache.flink.runtime.operators.hash;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
+import org.apache.flink.core.memory.RandomAccessOutputView;
import org.apache.flink.core.memory.SeekableDataInputView;
import org.apache.flink.core.memory.SeekableDataOutputView;
-import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index d6237a180aa..95ea05d68f8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -19,12 +19,12 @@
package org.apache.flink.runtime.operators.hash;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.core.memory.SeekableDataInputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import java.io.EOFException;
import java.io.IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
index b3736e55774..32de886882a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
@@ -23,10 +23,10 @@ import
org.apache.flink.api.common.typeutils.SameTypePairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
index 52a5d140ae8..91caa2450e0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.operators.resettable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.MemoryBlockIterator;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
index b421a84647c..a34383ce281 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.operators.resettable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.ResettableIterator;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
index e33ce4fe245..46c88635270 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.operators.resettable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index 646e00bb759..76baa914dbd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.util.MutableObjectIterator;
import java.io.EOFException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 81b5d2481e9..48a12ed2f05 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index e600da1c864..4af75e81824 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.io.disk;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index 0257725647f..e00b5f11bdd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
diff --git a/flink-table/flink-table-planner/pom.xml
b/flink-table/flink-table-planner/pom.xml
index 27f357eb88c..e37ffce9569 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -224,6 +224,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-type-utils</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
diff --git a/flink-table/flink-table-runtime/pom.xml
b/flink-table/flink-table-runtime/pom.xml
index e3cdcaa4f3b..9958fc66f1b 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -52,6 +52,13 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-type-utils</artifactId>
+ <version>${project.version}</version>
+ <optional>${flink.markBundledAsOptional}</optional>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
@@ -131,6 +138,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-type-utils</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-migration-test-utils</artifactId>
@@ -170,6 +185,7 @@ under the License.
<include>org.apache.flink:flink-shaded-jsonpath</include>
<include>org.codehaus.janino:*</include>
<include>org.apache.flink:flink-table-code-splitter</include>
+
<include>org.apache.flink:flink-table-type-utils</include>
</includes>
</artifactSet>
</configuration>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
index aa9e6a27ede..82d13257114 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.hashtable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.util.MathUtils;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
index 20a42c5e730..c3810ea531e 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
@@ -18,18 +18,18 @@
package org.apache.flink.table.runtime.hashtable;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.core.memory.SeekableDataInputView;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.FileChannelUtil;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index a8d9ad9b02a..7ab438d538f 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.runtime.hashtable;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.SeekableDataInputView;
@@ -27,8 +29,6 @@ import
org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.FileChannelUtil;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
index cc0a344d890..ab7b3113558 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
@@ -18,12 +18,12 @@
package org.apache.flink.table.runtime.operators.sort;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.util.MutableObjectIterator;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
index 74467c5c761..a0ce9c620be 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.runtime.operators.sort;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
index b7072f18175..56d9b5d1c69 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
@@ -18,9 +18,9 @@
package org.apache.flink.table.runtime.operators.sort;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.operators.sort.IndexedSortable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
index db98dfd77fe..9b1a5457c6c 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
@@ -18,10 +18,10 @@
package org.apache.flink.table.runtime.operators.sort;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
index 38f721e355f..7e4a1c6f4e9 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
@@ -18,9 +18,9 @@
package org.apache.flink.table.runtime.operators.sort;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
index b4175cc7157..9d0cbb57602 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
@@ -21,10 +21,10 @@ package org.apache.flink.table.runtime.typeutils;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.util.WindowKey;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
index 9a2746fe316..bca6dc00852 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.util;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
index 61da810d36e..2bf6b35a859 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
@@ -19,11 +19,11 @@
package org.apache.flink.table.runtime.util.collections.binary;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.AbstractPagedInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
index dd1c9f760f2..b6d1fdfea5e 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.runtime.util.collections.binary;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
index 3344997ea03..18b756f9ea9 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessOutputView;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.binary.BinaryArrayData;
import org.apache.flink.table.data.binary.BinaryMapData;
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index d6620acfa7d..9a1e04aecbb 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -20,7 +20,7 @@ package
org.apache.flink.table.runtime.util.collections.binary;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.table.data.binary.BinaryRowData;
diff --git a/flink-table/flink-table-type-utils/pom.xml
b/flink-table/flink-table-type-utils/pom.xml
new file mode 100644
index 00000000000..43f0b6e9b29
--- /dev/null
+++ b/flink-table/flink-table-type-utils/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table</artifactId>
+ <version>2.4-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-type-utils</artifactId>
+ <name>Flink : Table : Type Utilities</name>
+ <description>
+ This module contains type utilities and conversion stack for
the Table API,
+ including data structure converters and serializers.
+ </description>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- Core dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <optional>${flink.markBundledAsOptional}</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <optional>${flink.markBundledAsOptional}</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <optional>${flink.markBundledAsOptional}</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-guava</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
diff --git
a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
new file mode 100644
index 00000000000..5ac095a8fe4
--- /dev/null
+++
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ExpressionEvaluator;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utilities to compile a generated code to a Class. */
+public final class CompileUtils {
+
+ // used for logging the generated codes to a same place
+ private static final Logger CODE_LOG =
LoggerFactory.getLogger(CompileUtils.class);
+
+ /**
+ * Cache of compile, Janino generates a new Class Loader and a new Class
file every compile
+ * (guaranteeing that the class name will not be repeated). This leads to
multiple tasks of the
+ * same process that generate a large number of duplicate class, resulting
in a large number of
+ * Meta zone GC (class unloading), resulting in performance bottlenecks.
So we add a cache to
+ * avoid this problem.
+ */
+ static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE =
+ CacheBuilder.newBuilder()
+ // estimated maximum planning/startup time
+ .expireAfterAccess(Duration.ofMinutes(5))
+ // estimated cache size
+ .maximumSize(300)
+ .softValues()
+ .build();
+
+ static final Cache<ExpressionKey, ExpressionEvaluator>
COMPILED_EXPRESSION_CACHE =
+ CacheBuilder.newBuilder()
+ // estimated maximum planning/startup time
+ .expireAfterAccess(Duration.ofMinutes(5))
+ // estimated cache size
+ .maximumSize(100)
+ .softValues()
+ .build();
+
+ /** Triggers internal garbage collection of expired cache entries. */
+ public static void cleanUp() {
+ COMPILED_CLASS_CACHE.cleanUp();
+ COMPILED_EXPRESSION_CACHE.cleanUp();
+ }
+
+ /**
+ * Compiles a generated code to a Class.
+ *
+ * @param cl the ClassLoader used to load the class
+ * @param name the class name
+ * @param code the generated code
+ * @param <T> the class type
+ * @return the compiled class
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Class<T> compile(ClassLoader cl, String name, String
code) {
+ try {
+ // The class name is part of the "code" and makes the string
unique,
+ // to prevent class leaks we don't cache the class loader directly
+ // but only its hash code
+ final ClassKey classKey = new ClassKey(cl.hashCode(), code);
+ return (Class<T>) COMPILED_CLASS_CACHE.get(classKey, () ->
doCompile(cl, name, code));
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private static <T> Class<T> doCompile(ClassLoader cl, String name, String
code) {
+ checkNotNull(cl, "Classloader must not be null.");
+ CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
+ SimpleCompiler compiler = new SimpleCompiler();
+ compiler.setParentClassLoader(cl);
+ try {
+ compiler.cook(code);
+ } catch (Throwable t) {
+ System.out.println(addLineNumber(code));
+ throw new InvalidProgramException(
+ "Table program cannot be compiled. This is a bug. Please
file an issue.", t);
+ }
+ try {
+ //noinspection unchecked
+ return (Class<T>) compiler.getClassLoader().loadClass(name);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Can not load class " + name, e);
+ }
+ }
+
+ /**
+ * To output more information when an error occurs. Generally, when cook
fails, it shows which
+ * line is wrong. This line number starts at 1.
+ */
+ private static String addLineNumber(String code) {
+ String[] lines = code.split("\n");
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < lines.length; i++) {
+ builder.append("/* ").append(i + 1).append("
*/").append(lines[i]).append("\n");
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Compiles an expression code to a janino {@link ExpressionEvaluator}.
+ *
+ * @param code the expression code
+ * @param argumentNames the expression argument names
+ * @param argumentClasses the expression argument classes
+ * @param returnClass the return type of the expression
+ * @return the compiled class
+ */
+ public static ExpressionEvaluator compileExpression(
+ String code,
+ List<String> argumentNames,
+ List<Class<?>> argumentClasses,
+ Class<?> returnClass) {
+ try {
+ ExpressionKey key =
+ new ExpressionKey(code, argumentNames, argumentClasses,
returnClass);
+ return COMPILED_EXPRESSION_CACHE.get(
+ key,
+ () -> {
+ ExpressionEvaluator expressionEvaluator = new
ExpressionEvaluator();
+ // Input args
+ expressionEvaluator.setParameters(
+ argumentNames.toArray(new String[0]),
+ argumentClasses.toArray(new Class[0]));
+ // Result type
+ expressionEvaluator.setExpressionType(returnClass);
+ try {
+ // Compile
+ expressionEvaluator.cook(code);
+ } catch (CompileException e) {
+ throw new InvalidProgramException(
+ "Table program cannot be compiled. This is
a bug. Please file an issue.\nExpression: "
+ + code,
+ e);
+ }
+ return expressionEvaluator;
+ });
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ /** Class to use as key for the {@link #COMPILED_CLASS_CACHE}. */
+ private static class ClassKey {
+ private final int classLoaderId;
+ private final String code;
+
+ private ClassKey(int classLoaderId, String code) {
+ this.classLoaderId = classLoaderId;
+ this.code = code;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClassKey classKey = (ClassKey) o;
+ return classLoaderId == classKey.classLoaderId &&
code.equals(classKey.code);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(classLoaderId, code);
+ }
+ }
+
+ /** Class to use as key for the {@link #COMPILED_EXPRESSION_CACHE}. */
+ private static class ExpressionKey {
+ private final String code;
+ private final List<String> argumentNames;
+ private final List<Class<?>> argumentClasses;
+ private final Class<?> returnClass;
+
+ private ExpressionKey(
+ String code,
+ List<String> argumentNames,
+ List<Class<?>> argumentClasses,
+ Class<?> returnClass) {
+ this.code = code;
+ this.argumentNames = argumentNames;
+ this.argumentClasses = argumentClasses;
+ this.returnClass = returnClass;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExpressionKey that = (ExpressionKey) o;
+ return code.equals(that.code)
+ && argumentNames.equals(that.argumentNames)
+ && argumentClasses.equals(that.argumentClasses)
+ && returnClass.equals(that.returnClass);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(code, argumentNames, argumentClasses,
returnClass);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
similarity index 99%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
index 324482b8da5..6df8726ed4b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
+++
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
@@ -22,13 +22,13 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentWritable;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinarySegmentUtils;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
similarity index 97%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
index 8227a862e2b..c5f0ca5fe85 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
+++
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.runtime.typeutils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
import java.io.IOException;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
similarity index 99%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
index a83ea7a5e55..7a634f67efe 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
+++
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
@@ -26,10 +26,10 @@ import
org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
rename to
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
similarity index 100%
rename from
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
rename to
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index ac72b7ed37a..1cc2ba7b79f 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -33,6 +33,7 @@ under the License.
<modules>
<module>flink-table-common</module>
+ <module>flink-table-type-utils</module>
<module>flink-table-api-java</module>
<module>flink-table-api-scala</module>
<module>flink-table-api-bridge-base</module>