This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b53638656c8 [FLINK-39458][table] Move type converters and serializers 
into new flink-table-type-utils module
b53638656c8 is described below

commit b53638656c875e5655cc9b960341ff2a3b869855
Author: Mika Naylor <[email protected]>
AuthorDate: Fri Apr 24 09:29:07 2026 +0200

    [FLINK-39458][table] Move type converters and serializers into new 
flink-table-type-utils module
    
    This closes #27980.
---
 .../flink/core}/memory/AbstractPagedInputView.java |   5 +-
 .../core}/memory/AbstractPagedOutputView.java      |   7 +-
 .../core}/memory/ListMemorySegmentSource.java      |   5 +-
 .../flink/core/memory}/RandomAccessInputView.java  |   5 +-
 .../flink/core/memory}/RandomAccessOutputView.java |   5 +-
 .../runtime/io/disk/FileChannelInputView.java      |   2 +-
 .../runtime/io/disk/FileChannelOutputView.java     |   2 +-
 .../io/disk/SeekableFileChannelInputView.java      |   2 +-
 .../io/disk/SimpleCollectingOutputView.java        |   2 +-
 .../flink/runtime/io/disk/SpillingBuffer.java      |   3 +-
 .../iomanager/AbstractChannelReaderInputView.java  |   2 +-
 .../iomanager/AbstractChannelWriterOutputView.java |   2 +-
 .../io/disk/iomanager/ChannelReaderInputView.java  |   2 +-
 .../io/disk/iomanager/ChannelWriterOutputView.java |   2 +-
 .../iterative/io/SerializedUpdateBuffer.java       |   4 +-
 .../flink/runtime/operators/TempBarrier.java       |   2 +-
 .../operators/hash/CompactingHashTable.java        |   2 +-
 .../runtime/operators/hash/HashPartition.java      |   6 +-
 .../runtime/operators/hash/InMemoryPartition.java  |   6 +-
 .../operators/hash/InPlaceMutableHashTable.java    |   4 +-
 .../AbstractBlockResettableIterator.java           |   4 +-
 .../resettable/SpillingResettableIterator.java     |   2 +-
 .../SpillingResettableMutableObjectIterator.java   |   2 +-
 .../operators/sort/FixedLengthRecordSorter.java    |   4 +-
 .../operators/sort/NormalizedKeySorter.java        |   4 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   2 +-
 .../network/api/serialization/PagedViewsTest.java  |   4 +-
 flink-table/flink-table-planner/pom.xml            |   8 +
 flink-table/flink-table-runtime/pom.xml            |  16 ++
 .../runtime/hashtable/BinaryHashBucketArea.java    |   2 +-
 .../runtime/hashtable/BinaryHashPartition.java     |   6 +-
 .../table/runtime/hashtable/LongHashPartition.java |   4 +-
 .../sort/AbstractBinaryExternalMerger.java         |   2 +-
 .../operators/sort/BinaryExternalMerger.java       |   2 +-
 .../operators/sort/BinaryIndexedSortable.java      |   4 +-
 .../operators/sort/BinaryKVExternalMerger.java     |   2 +-
 .../operators/sort/BinaryKVInMemorySortBuffer.java |   4 +-
 .../runtime/typeutils/WindowKeySerializer.java     |   4 +-
 .../runtime/util/ResettableExternalBuffer.java     |   2 +-
 .../collections/binary/AbstractBytesHashMap.java   |   4 +-
 .../collections/binary/AbstractBytesMultiMap.java  |   6 +-
 .../apache/flink/table/data/BinaryRowDataTest.java |   4 +-
 .../collections/binary/BytesHashMapTestBase.java   |   2 +-
 flink-table/flink-table-type-utils/pom.xml         | 110 ++++++++++
 .../conversion/ArrayBooleanArrayConverter.java     |   0
 .../data/conversion/ArrayByteArrayConverter.java   |   0
 .../data/conversion/ArrayDoubleArrayConverter.java |   0
 .../data/conversion/ArrayFloatArrayConverter.java  |   0
 .../data/conversion/ArrayIntArrayConverter.java    |   0
 .../table/data/conversion/ArrayListConverter.java  |   0
 .../data/conversion/ArrayLongArrayConverter.java   |   0
 .../data/conversion/ArrayObjectArrayConverter.java |   0
 .../data/conversion/ArrayShortArrayConverter.java  |   0
 .../data/conversion/BitmapBitmapConverter.java     |   0
 .../data/conversion/DataStructureConverter.java    |   0
 .../data/conversion/DataStructureConverters.java   |   0
 .../table/data/conversion/DateDateConverter.java   |   0
 .../data/conversion/DateLocalDateConverter.java    |   0
 .../DayTimeIntervalDurationConverter.java          |   0
 .../conversion/DecimalBigDecimalConverter.java     |   0
 .../table/data/conversion/IdentityConverter.java   |   0
 .../LocalZonedTimestampInstantConverter.java       |   0
 .../LocalZonedTimestampIntConverter.java           |   0
 .../LocalZonedTimestampLongConverter.java          |   0
 .../LocalZonedTimestampTimestampConverter.java     |   0
 .../table/data/conversion/MapMapConverter.java     |   0
 .../data/conversion/RawByteArrayConverter.java     |   0
 .../table/data/conversion/RawObjectConverter.java  |   0
 .../table/data/conversion/RowRowConverter.java     |   0
 .../data/conversion/StringByteArrayConverter.java  |   0
 .../data/conversion/StringStringConverter.java     |   0
 .../data/conversion/StructuredObjectConverter.java |   0
 .../data/conversion/TimeLocalTimeConverter.java    |   0
 .../table/data/conversion/TimeLongConverter.java   |   0
 .../table/data/conversion/TimeTimeConverter.java   |   0
 .../TimestampLocalDateTimeConverter.java           |   0
 .../conversion/TimestampTimestampConverter.java    |   0
 .../YearMonthIntervalPeriodConverter.java          |   0
 .../apache/flink/table/data/util/MapDataUtil.java  |   0
 .../table/data/writer/AbstractBinaryWriter.java    |   0
 .../flink/table/data/writer/BinaryArrayWriter.java |   0
 .../flink/table/data/writer/BinaryRowWriter.java   |   0
 .../flink/table/data/writer/BinaryWriter.java      |   0
 .../table/runtime/generated/CompileUtils.java      | 240 +++++++++++++++++++++
 .../runtime/typeutils/AbstractMapSerializer.java   |   0
 .../typeutils/AbstractRowDataSerializer.java       |   0
 .../runtime/typeutils/ArrayDataSerializer.java     |   0
 .../runtime/typeutils/BinaryRowDataSerializer.java |   4 +-
 .../runtime/typeutils/DecimalDataSerializer.java   |   0
 .../runtime/typeutils/DecimalDataTypeInfo.java     |   0
 .../runtime/typeutils/ExternalSerializer.java      |   0
 .../table/runtime/typeutils/ExternalTypeInfo.java  |   0
 .../runtime/typeutils/InternalSerializers.java     |   0
 .../table/runtime/typeutils/InternalTypeInfo.java  |   0
 .../table/runtime/typeutils/MapDataSerializer.java |   0
 .../runtime/typeutils/PagedTypeSerializer.java     |   4 +-
 .../runtime/typeutils/RawValueDataSerializer.java  |   0
 .../table/runtime/typeutils/RowDataSerializer.java |   4 +-
 .../runtime/typeutils/StringDataSerializer.java    |   0
 .../runtime/typeutils/TimestampDataSerializer.java |   0
 .../table/data/DataStructureConvertersTest.java    |   0
 .../runtime/typeutils/ArrayDataSerializerTest.java |   0
 .../runtime/typeutils/BinaryRowSerializerTest.java |   0
 .../runtime/typeutils/DecimalSerializerTest.java   |   0
 .../runtime/typeutils/ExternalSerializerTest.java  |   0
 .../runtime/typeutils/MapDataSerializerTest.java   |   0
 .../typeutils/RawValueDataSerializerTest.java      |   0
 .../runtime/typeutils/RowDataSerializerTest.java   |   0
 .../typeutils/StringDataSerializerTest.java        |   0
 .../typeutils/TimestampDataSerializerTest.java     |   0
 .../flink/table/utils/RawValueDataAsserter.java    |   0
 flink-table/pom.xml                                |   1 +
 112 files changed, 443 insertions(+), 84 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedInputView.java
similarity index 99%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
rename to 
flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedInputView.java
index b9b23631f67..eb06a16a2c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedInputView.java
@@ -16,10 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.memory;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
+package org.apache.flink.core.memory;
 
 import java.io.EOFException;
 import java.io.IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedOutputView.java
similarity index 98%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
rename to 
flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedOutputView.java
index 8d8fdf9c79d..9709ca6d5e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/AbstractPagedOutputView.java
@@ -16,12 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.memory;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentWritable;
+package org.apache.flink.core.memory;
 
 import java.io.IOException;
 import java.io.UTFDataFormatException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ListMemorySegmentSource.java
similarity index 90%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
rename to 
flink-core/src/main/java/org/apache/flink/core/memory/ListMemorySegmentSource.java
index d21d141316c..502fe1b901b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ListMemorySegmentSource.java
@@ -16,10 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.memory;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentSource;
+package org.apache.flink.core.memory;
 
 import java.util.List;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessInputView.java
similarity index 93%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
rename to 
flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessInputView.java
index a12cf7704eb..bead6fee2bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessInputView.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.disk;
+package org.apache.flink.core.memory;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.SeekableDataInputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.util.MathUtils;
 
 import java.io.EOFException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessOutputView.java
similarity index 91%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
rename to 
flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessOutputView.java
index 3ce326e6892..7e348da0cdf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/RandomAccessOutputView.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.disk;
+package org.apache.flink.core.memory;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.SeekableDataOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.util.MathUtils;
 
 import java.io.EOFException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
index f963235b707..67da217fd3d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.disk;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.MathUtils;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
index f415867c281..88564c4b0e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.disk;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.runtime.memory.MemoryManager;
 
 import java.io.IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index 1613765b343..d9d1ae128c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.io.disk;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.MathUtils;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
index 514d1343812..bb1b437d6bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.disk;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.util.MathUtils;
 
 import java.io.EOFException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
index 1c576346810..8838b4bcfed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.io.disk;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import 
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 
 import java.io.IOException;
 import java.util.ArrayList;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
index e9f6cdff775..2ceb54252a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
 
 import java.io.IOException;
 import java.util.List;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
index 9e8f34d2c91..fd6be9d1a8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 
 import java.io.IOException;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index 0d7252c3097..d8a8edb1f54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
 
 import java.io.EOFException;
 import java.io.IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index 465bd6d9c18..3be7bca27ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 
 import java.io.IOException;
 import java.util.ArrayList;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index f6abefd7299..a731d1e0cd5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.iterative.io;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 
 import java.io.EOFException;
 import java.io.IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
index c248052169e..ce2435b5825 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
@@ -21,12 +21,12 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.io.disk.SpillingBuffer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.util.CloseableInputProvider;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 7a999e05ab2..a13291107c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.runtime.util.LongArrayList;
 import org.apache.flink.util.MathUtils;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index f43da4eba24..32c605e51f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -20,17 +20,17 @@ package org.apache.flink.runtime.operators.hash;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
+import org.apache.flink.core.memory.RandomAccessOutputView;
 import org.apache.flink.core.memory.SeekableDataInputView;
 import org.apache.flink.core.memory.SeekableDataOutputView;
-import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index d6237a180aa..95ea05d68f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.operators.hash;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
 import org.apache.flink.core.memory.SeekableDataInputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 
 import java.io.EOFException;
 import java.io.IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
index b3736e55774..32de886882a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
@@ -23,10 +23,10 @@ import 
org.apache.flink.api.common.typeutils.SameTypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
index 52a5d140ae8..91caa2450e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.operators.resettable;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.util.MemoryBlockIterator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
index b421a84647c..a34383ce281 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.operators.resettable;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.SpillingBuffer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.util.ResettableIterator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
index e33ce4fe245..46c88635270 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.operators.resettable;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.SpillingBuffer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index 646e00bb759..76baa914dbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.operators.sort;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.EOFException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 81b5d2481e9..48a12ed2f05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.operators.sort;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.util.MutableObjectIterator;
 
 import org.slf4j.Logger;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index e600da1c864..4af75e81824 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.io.disk;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.ListMemorySegmentSource;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index 0257725647f..e00b5f11bdd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
diff --git a/flink-table/flink-table-planner/pom.xml 
b/flink-table/flink-table-planner/pom.xml
index 27f357eb88c..e37ffce9569 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -224,6 +224,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-type-utils</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java</artifactId>
diff --git a/flink-table/flink-table-runtime/pom.xml 
b/flink-table/flink-table-runtime/pom.xml
index e3cdcaa4f3b..9958fc66f1b 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -52,6 +52,13 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-type-utils</artifactId>
+                       <version>${project.version}</version>
+                       <optional>${flink.markBundledAsOptional}</optional>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-api-java</artifactId>
@@ -131,6 +138,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-type-utils</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-migration-test-utils</artifactId>
@@ -170,6 +185,7 @@ under the License.
                                                                        
<include>org.apache.flink:flink-shaded-jsonpath</include>
                                                                        
<include>org.codehaus.janino:*</include>
                                                                        
<include>org.apache.flink:flink-table-code-splitter</include>
+                                                                       
<include>org.apache.flink:flink-table-type-utils</include>
                                                                </includes>
                                                        </artifactSet>
                                                </configuration>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
index aa9e6a27ede..82d13257114 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.hashtable;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
 import org.apache.flink.util.MathUtils;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
index 20a42c5e730..c3810ea531e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashPartition.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.table.runtime.hashtable;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.core.memory.SeekableDataInputView;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index a8d9ad9b02a..7ab438d538f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.runtime.hashtable;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.SeekableDataInputView;
@@ -27,8 +29,6 @@ import 
org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
index cc0a344d890..ab7b3113558 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.runtime.operators.sort;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.runtime.io.ChannelWithMeta;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
 import org.apache.flink.util.MutableObjectIterator;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
index 74467c5c761..a0ce9c620be 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.runtime.operators.sort;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
index b7072f18175..56d9b5d1c69 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryIndexedSortable.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.table.runtime.operators.sort;
 
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.operators.sort.IndexedSortable;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
index db98dfd77fe..9b1a5457c6c 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
@@ -18,10 +18,10 @@
 package org.apache.flink.table.runtime.operators.sort;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
 import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
index 38f721e355f..7e4a1c6f4e9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
@@ -18,9 +18,9 @@
 package org.apache.flink.table.runtime.operators.sort;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
 import org.apache.flink.table.runtime.generated.RecordComparator;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
index b4175cc7157..9d0cbb57602 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
@@ -21,10 +21,10 @@ package org.apache.flink.table.runtime.typeutils;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.util.WindowKey;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
index 9a2746fe316..bca6dc00852 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.util;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
index 61da810d36e..2bf6b35a859 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
@@ -19,11 +19,11 @@
 package org.apache.flink.table.runtime.util.collections.binary;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.AbstractPagedInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
index dd1c9f760f2..b6d1fdfea5e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.runtime.util.collections.binary;
 
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
index 3344997ea03..18b756f9ea9 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessOutputView;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.binary.BinaryArrayData;
 import org.apache.flink.table.data.binary.BinaryMapData;
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index d6620acfa7d..9a1e04aecbb 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.runtime.util.collections.binary;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.core.memory.RandomAccessInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.table.data.binary.BinaryRowData;
diff --git a/flink-table/flink-table-type-utils/pom.xml 
b/flink-table/flink-table-type-utils/pom.xml
new file mode 100644
index 00000000000..43f0b6e9b29
--- /dev/null
+++ b/flink-table/flink-table-type-utils/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-table</artifactId>
+               <version>2.4-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>flink-table-type-utils</artifactId>
+       <name>Flink : Table : Type Utilities</name>
+       <description>
+               This module contains type utilities and conversion stack for 
the Table API,
+               including data structure converters and serializers.
+       </description>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <!-- Core dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>janino</artifactId>
+                       <optional>${flink.markBundledAsOptional}</optional>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>commons-compiler</artifactId>
+                       <optional>${flink.markBundledAsOptional}</optional>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.commons</groupId>
+                       <artifactId>commons-lang3</artifactId>
+                       <optional>${flink.markBundledAsOptional}</optional>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-guava</artifactId>
+               </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/BitmapBitmapConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampTimestampConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/util/MapDataUtil.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
diff --git 
a/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
new file mode 100644
index 00000000000..5ac095a8fe4
--- /dev/null
+++ 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ExpressionEvaluator;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utilities to compile a generated code to a Class. */
+public final class CompileUtils {
+
+    // used for logging the generated codes to a same place
+    private static final Logger CODE_LOG = 
LoggerFactory.getLogger(CompileUtils.class);
+
+    /**
+     * Cache of compile, Janino generates a new Class Loader and a new Class 
file every compile
+     * (guaranteeing that the class name will not be repeated). This leads to 
multiple tasks of the
+     * same process that generate a large number of duplicate class, resulting 
in a large number of
+     * Meta zone GC (class unloading), resulting in performance bottlenecks. 
So we add a cache to
+     * avoid this problem.
+     */
+    static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE =
+            CacheBuilder.newBuilder()
+                    // estimated maximum planning/startup time
+                    .expireAfterAccess(Duration.ofMinutes(5))
+                    // estimated cache size
+                    .maximumSize(300)
+                    .softValues()
+                    .build();
+
+    static final Cache<ExpressionKey, ExpressionEvaluator> 
COMPILED_EXPRESSION_CACHE =
+            CacheBuilder.newBuilder()
+                    // estimated maximum planning/startup time
+                    .expireAfterAccess(Duration.ofMinutes(5))
+                    // estimated cache size
+                    .maximumSize(100)
+                    .softValues()
+                    .build();
+
+    /** Triggers internal garbage collection of expired cache entries. */
+    public static void cleanUp() {
+        COMPILED_CLASS_CACHE.cleanUp();
+        COMPILED_EXPRESSION_CACHE.cleanUp();
+    }
+
+    /**
+     * Compiles a generated code to a Class.
+     *
+     * @param cl the ClassLoader used to load the class
+     * @param name the class name
+     * @param code the generated code
+     * @param <T> the class type
+     * @return the compiled class
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Class<T> compile(ClassLoader cl, String name, String 
code) {
+        try {
+            // The class name is part of the "code" and makes the string 
unique,
+            // to prevent class leaks we don't cache the class loader directly
+            // but only its hash code
+            final ClassKey classKey = new ClassKey(cl.hashCode(), code);
+            return (Class<T>) COMPILED_CLASS_CACHE.get(classKey, () -> 
doCompile(cl, name, code));
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e.getMessage(), e);
+        }
+    }
+
+    private static <T> Class<T> doCompile(ClassLoader cl, String name, String 
code) {
+        checkNotNull(cl, "Classloader must not be null.");
+        CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
+        SimpleCompiler compiler = new SimpleCompiler();
+        compiler.setParentClassLoader(cl);
+        try {
+            compiler.cook(code);
+        } catch (Throwable t) {
+            System.out.println(addLineNumber(code));
+            throw new InvalidProgramException(
+                    "Table program cannot be compiled. This is a bug. Please 
file an issue.", t);
+        }
+        try {
+            //noinspection unchecked
+            return (Class<T>) compiler.getClassLoader().loadClass(name);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Can not load class " + name, e);
+        }
+    }
+
+    /**
+     * To output more information when an error occurs. Generally, when cook 
fails, it shows which
+     * line is wrong. This line number starts at 1.
+     */
+    private static String addLineNumber(String code) {
+        String[] lines = code.split("\n");
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < lines.length; i++) {
+            builder.append("/* ").append(i + 1).append(" 
*/").append(lines[i]).append("\n");
+        }
+        return builder.toString();
+    }
+
+    /**
+     * Compiles an expression code to a janino {@link ExpressionEvaluator}.
+     *
+     * @param code the expression code
+     * @param argumentNames the expression argument names
+     * @param argumentClasses the expression argument classes
+     * @param returnClass the return type of the expression
+     * @return the compiled class
+     */
+    public static ExpressionEvaluator compileExpression(
+            String code,
+            List<String> argumentNames,
+            List<Class<?>> argumentClasses,
+            Class<?> returnClass) {
+        try {
+            ExpressionKey key =
+                    new ExpressionKey(code, argumentNames, argumentClasses, 
returnClass);
+            return COMPILED_EXPRESSION_CACHE.get(
+                    key,
+                    () -> {
+                        ExpressionEvaluator expressionEvaluator = new 
ExpressionEvaluator();
+                        // Input args
+                        expressionEvaluator.setParameters(
+                                argumentNames.toArray(new String[0]),
+                                argumentClasses.toArray(new Class[0]));
+                        // Result type
+                        expressionEvaluator.setExpressionType(returnClass);
+                        try {
+                            // Compile
+                            expressionEvaluator.cook(code);
+                        } catch (CompileException e) {
+                            throw new InvalidProgramException(
+                                    "Table program cannot be compiled. This is 
a bug. Please file an issue.\nExpression: "
+                                            + code,
+                                    e);
+                        }
+                        return expressionEvaluator;
+                    });
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e.getMessage(), e);
+        }
+    }
+
+    /** Class to use as key for the {@link #COMPILED_CLASS_CACHE}. */
+    private static class ClassKey {
+        private final int classLoaderId;
+        private final String code;
+
+        private ClassKey(int classLoaderId, String code) {
+            this.classLoaderId = classLoaderId;
+            this.code = code;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ClassKey classKey = (ClassKey) o;
+            return classLoaderId == classKey.classLoaderId && 
code.equals(classKey.code);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(classLoaderId, code);
+        }
+    }
+
+    /** Class to use as key for the {@link #COMPILED_EXPRESSION_CACHE}. */
+    private static class ExpressionKey {
+        private final String code;
+        private final List<String> argumentNames;
+        private final List<Class<?>> argumentClasses;
+        private final Class<?> returnClass;
+
+        private ExpressionKey(
+                String code,
+                List<String> argumentNames,
+                List<Class<?>> argumentClasses,
+                Class<?> returnClass) {
+            this.code = code;
+            this.argumentNames = argumentNames;
+            this.argumentClasses = argumentClasses;
+            this.returnClass = returnClass;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ExpressionKey that = (ExpressionKey) o;
+            return code.equals(that.code)
+                    && argumentNames.equals(that.argumentNames)
+                    && argumentClasses.equals(that.argumentClasses)
+                    && returnClass.equals(that.returnClass);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(code, argumentNames, argumentClasses, 
returnClass);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractMapSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/AbstractRowDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
similarity index 99%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
index 324482b8da5..6df8726ed4b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
+++ 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
@@ -22,13 +22,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentWritable;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.binary.BinarySegmentUtils;
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataTypeInfo.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalSerializers.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/InternalTypeInfo.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
similarity index 97%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
index 8227a862e2b..c5f0ca5fe85 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
+++ 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.runtime.typeutils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.binary.BinaryRowData;
 
 import java.io.IOException;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
similarity index 99%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
index a83ea7a5e55..7a634f67efe 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
+++ 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
@@ -26,10 +26,10 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.AbstractPagedInputView;
+import org.apache.flink.core.memory.AbstractPagedOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/StringDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
 
b/flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
rename to 
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryRowSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/DecimalSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/ExternalSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/StringDataSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializerTest.java
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
 
b/flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
similarity index 100%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
rename to 
flink-table/flink-table-type-utils/src/test/java/org/apache/flink/table/utils/RawValueDataAsserter.java
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index ac72b7ed37a..1cc2ba7b79f 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
        <modules>
                <module>flink-table-common</module>
+               <module>flink-table-type-utils</module>
                <module>flink-table-api-java</module>
                <module>flink-table-api-scala</module>
                <module>flink-table-api-bridge-base</module>

Reply via email to