This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63bdbc9d4a2b2c025e450cd1cf6e6008940389de
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Sep 21 15:05:35 2020 +0200

    [FLINK-19472] Implement a one input sorting DataInput
    
    I implement a SortingDataInput which can wrap a regular network input
    and sort the incoming records before passing them on to the DataOutput.
    The sorting input performs the final sorting when it receives an
    InputStatus.END_OF_INPUT from the chained input.
    
    The sorter uses a binary comparison of serialized keys of the incoming
    records. It uses first n bytes of the serialized key as a normalized
    key.
    
    Watermarks, stream statuses, or latency markers are not propagated, only 
the largest seen watermark is
    emitted after all records.
    
    This closes #13521
---
 .../api/common/typeutils/ComparatorTestBase.java   |  66 +++--
 .../operators/sort/NormalizedKeySorter.java        |   2 +-
 .../runtime/operators/sort/SpillingThread.java     |  10 +-
 .../operators/sort/BytesKeyNormalizationUtil.java  |  85 +++++++
 .../sort/FixedLengthByteKeyComparator.java         | 178 ++++++++++++++
 .../api/operators/sort/KeyAndValueSerializer.java  | 188 +++++++++++++++
 .../api/operators/sort/SortingDataInput.java       | 229 ++++++++++++++++++
 .../sort/VariableLengthByteKeyComparator.java      | 182 ++++++++++++++
 .../sort/FixedLengthByteKeyComparatorTest.java     |  70 ++++++
 .../sort/FixedLengthKeyAndValueSerializerTest.java |  73 ++++++
 .../sort/SerializerComparatorTestData.java         | 109 +++++++++
 .../api/operators/sort/SortingDataInputITCase.java | 243 +++++++++++++++++++
 .../api/operators/sort/SortingDataInputTest.java   | 268 +++++++++++++++++++++
 .../sort/VariableLengthByteKeyComparatorTest.java  |  68 ++++++
 .../VariableLengthKeyAndValueSerializerTest.java   |  70 ++++++
 15 files changed, 1812 insertions(+), 29 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index 279c2b3..fd3581c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -18,14 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.junit.Assert.*;
-
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -35,6 +28,17 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Abstract test base for comparators.
  *
@@ -45,8 +49,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        // Same as in the NormalizedKeySorter
        private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 8;
 
-       protected boolean[] getTestedOrder() {
-               return new boolean[] {true, false};
+       protected Order[] getTestedOrder() {
+               return new Order[] {Order.ASCENDING, Order.DESCENDING};
        }
 
        protected abstract TypeComparator<T> createComparator(boolean 
ascending);
@@ -67,7 +71,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        @Test
        public void testDuplicate() {
                try {
-                       TypeComparator<T> comparator = getComparator(true);
+                       boolean ascending = isAscending(getTestedOrder()[0]);
+                       TypeComparator<T> comparator = getComparator(ascending);
                        TypeComparator<T> clone = comparator.duplicate();
                        
                        T[] data = getSortedData();
@@ -88,9 +93,9 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        
        @Test
        public void testEquality() {
-               for (boolean ascending : getTestedOrder()) {
-                       testEquals(true);
-                       testEquals(false);
+               for (Order order : getTestedOrder()) {
+                       boolean ascending = isAscending(order);
+                       testEquals(ascending);
                }
        }
 
@@ -128,8 +133,9 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        public void testEqualityWithReference() {
                try {
                        TypeSerializer<T> serializer = createSerializer();
-                       TypeComparator<T> comparator = getComparator(true);
-                       TypeComparator<T> comparator2 = getComparator(true);
+                       boolean ascending = isAscending(getTestedOrder()[0]);
+                       TypeComparator<T> comparator = getComparator(ascending);
+                       TypeComparator<T> comparator2 = 
getComparator(ascending);
                        T[] data = getSortedData();
                        for (T d : data) {
                                comparator.setReference(d);
@@ -151,7 +157,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        // --------------------------------- inequality tests 
----------------------------------------
        @Test
        public void testInequality() {
-               for (boolean ascending : getTestedOrder()) {
+               for (Order order : getTestedOrder()) {
+                       boolean ascending = isAscending(order);
                        testGreatSmallAscDesc(ascending, true);
                        testGreatSmallAscDesc(ascending, false);
                }
@@ -202,7 +209,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
 
        @Test
        public void testInequalityWithReference() {
-               for (boolean ascending : getTestedOrder()) {
+               for (Order order : getTestedOrder()) {
+                       boolean ascending = isAscending(order);
                        testGreatSmallAscDescWithReference(ascending, true);
                        testGreatSmallAscDescWithReference(ascending, false);
                }
@@ -276,7 +284,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        @Test
        public void testNormalizedKeysEqualsFullLength() {
                // Ascending or descending does not matter in this case
-               TypeComparator<T> comparator = getComparator(true);
+               boolean ascending = isAscending(getTestedOrder()[0]);
+               TypeComparator<T> comparator = getComparator(ascending);
                if (!comparator.supportsNormalizedKey()) {
                        return;
                }
@@ -285,7 +294,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
 
        @Test
        public void testNormalizedKeysEqualsHalfLength() {
-               TypeComparator<T> comparator = getComparator(true);
+               boolean ascending = isAscending(getTestedOrder()[0]);
+               TypeComparator<T> comparator = getComparator(ascending);
                if (!comparator.supportsNormalizedKey()) {
                        return;
                }
@@ -294,7 +304,8 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        
        public void testNormalizedKeysEquals(boolean halfLength) {
                try {
-                       TypeComparator<T> comparator = getComparator(true);
+                       boolean ascending = isAscending(getTestedOrder()[0]);
+                       TypeComparator<T> comparator = getComparator(ascending);
                        T[] data = getSortedData();
                        int normKeyLen = getNormKeyLen(halfLength, data, 
comparator);
 
@@ -315,7 +326,7 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        @Test
        public void testNormalizedKeysGreatSmallFullLength() {
                // ascending/descending in comparator doesn't matter for 
normalized keys
-               boolean ascending = getTestedOrder()[0];
+               boolean ascending = isAscending(getTestedOrder()[0]);
                TypeComparator<T> comparator = getComparator(ascending);
                if (!comparator.supportsNormalizedKey()) {
                        return;
@@ -327,7 +338,7 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        @Test
        public void testNormalizedKeysGreatSmallAscDescHalfLength() {
                // ascending/descending in comparator doesn't matter for 
normalized keys
-               boolean ascending = getTestedOrder()[0];
+               boolean ascending = isAscending(getTestedOrder()[0]);
                TypeComparator<T> comparator = getComparator(ascending);
                if (!comparator.supportsNormalizedKey()) {
                        return;
@@ -382,7 +393,7 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
                        T[] data = getSortedData();
                        T reuse = getSortedData()[0];
 
-                       boolean ascending = getTestedOrder()[0];
+                       boolean ascending = isAscending(getTestedOrder()[0]);
                        TypeComparator<T> comp1 = getComparator(ascending);
                        if(!comp1.supportsSerializationWithKeyNormalization()){
                                return;
@@ -413,7 +424,7 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        @Test
        @SuppressWarnings("unchecked")
        public void testKeyExtraction() {
-               boolean ascending = getTestedOrder()[0];
+               boolean ascending = isAscending(getTestedOrder()[0]);
                TypeComparator<T> comparator = getComparator(ascending);
                T[] data = getSortedData();
 
@@ -491,6 +502,11 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
        }
 
        // 
--------------------------------------------------------------------------------------------
+
+       private static boolean isAscending(Order order) {
+               return order == Order.ASCENDING;
+       }
+
        public static final class TestOutputView extends DataOutputStream 
implements DataOutputView {
 
                public TestOutputView() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 512e73e..8043521 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -531,7 +531,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        }
                }
        }
-       
+
        /**
         * Writes a subset of the records in this buffer in their logical order 
to the given output.
         * 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java
index 0c52fae..3f2d059 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java
@@ -310,9 +310,13 @@ final class SpillingThread<E> extends ThreadBase<E> {
                disposeSortBuffers(true);
 
                // set lazy iterator
-               this.dispatcher.sendResult(iterators.isEmpty() ? 
EmptyMutableObjectIterator.get() :
-                       iterators.size() == 1 ? iterators.get(0) :
-                               new MergeIterator<>(iterators, 
this.comparator));
+               if (iterators.isEmpty()) {
+                       
this.dispatcher.sendResult(EmptyMutableObjectIterator.get());
+               } else if (iterators.size() == 1) {
+                       this.dispatcher.sendResult(iterators.get(0));
+               } else {
+                       this.dispatcher.sendResult(new 
MergeIterator<>(iterators, this.comparator));
+               }
        }
 
        private List<ChannelWithBlockCount> 
startSpilling(Queue<CircularElement<E>> cache) throws IOException, 
InterruptedException {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/BytesKeyNormalizationUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/BytesKeyNormalizationUtil.java
new file mode 100644
index 0000000..ef0acf7
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/BytesKeyNormalizationUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static 
org.apache.flink.streaming.api.operators.sort.FixedLengthByteKeyComparator.TIMESTAMP_BYTE_SIZE;
+
+/**
+ * Utility class for common key normalization used both in {@link 
VariableLengthByteKeyComparator}
+ * and {@link FixedLengthByteKeyComparator}.
+ */
+final class BytesKeyNormalizationUtil {
+       /**
+        * Writes the normalized key of given record. The normalized key 
consists of the key serialized as bytes and
+        * the timestamp of the record.
+        *
+        * <p>NOTE: The key does not represent a logical order. It can be used 
only for grouping keys!
+        */
+       static <IN> void putNormalizedKey(
+                       Tuple2<byte[], StreamRecord<IN>> record,
+                       int dataLength,
+                       MemorySegment target,
+                       int offset,
+                       int numBytes) {
+               byte[] data = record.f0;
+
+               if (dataLength >= numBytes) {
+                       putBytesArray(target, offset, numBytes, data);
+               } else {
+                       // whole key fits into the normalized key
+                       putBytesArray(target, offset, dataLength, data);
+                       int lastOffset = offset + numBytes;
+                       offset += dataLength;
+                       long valueOfTimestamp = 
record.f1.asRecord().getTimestamp() - Long.MIN_VALUE;
+                       if (dataLength + TIMESTAMP_BYTE_SIZE <= numBytes) {
+                               // whole timestamp fits into the normalized key
+                               target.putLong(offset, valueOfTimestamp);
+                               offset += TIMESTAMP_BYTE_SIZE;
+                               // fill in the remaining space with zeros
+                               while (offset < lastOffset) {
+                                       target.put(offset++, (byte) 0);
+                               }
+                       } else {
+                               // only part of the timestamp fits into 
normalized key
+                               for (int i = 0; offset < lastOffset; offset++, 
i++) {
+                                       target.put(offset, (byte) 
(valueOfTimestamp >>> ((7 - i) << 3)));
+                               }
+                       }
+               }
+       }
+
+       private static void putBytesArray(MemorySegment target, int offset, int 
numBytes, byte[] data) {
+               for (int i = 0; i < numBytes; i++) {
+                       // We're converting the signed byte in data into an 
unsigned representation.
+                       // A Java byte goes from -127 to 128, i.e. is signed. 
By subtracting -127 (MIN_VALUE)
+                       // here we're shifting the number to be from 0 to 255. 
The normalized key sorter sorts
+                       // bytes as "unsigned", so we need to convert here to 
maintain a correct ordering.
+                       int highByte = data[i] & 0xff;
+                       highByte -= Byte.MIN_VALUE;
+                       target.put(offset + i, (byte) highByte);
+               }
+       }
+
+       private BytesKeyNormalizationUtil() {
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparator.java
new file mode 100644
index 0000000..6b56c24
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A comparator used in {@link SortingDataInput} which compares records keys 
and timestamps.
+ * It uses binary format produced by the {@link KeyAndValueSerializer}.
+ *
+ * <p>It assumes keys are always of a fixed length and thus the length of the 
record is not serialized.
+ */
+final class FixedLengthByteKeyComparator<IN> extends 
TypeComparator<Tuple2<byte[], StreamRecord<IN>>> {
+       static final int TIMESTAMP_BYTE_SIZE = 8;
+       private final int keyLength;
+       private byte[] keyReference;
+       private long timestampReference;
+
+       FixedLengthByteKeyComparator(int keyLength) {
+               this.keyLength = keyLength;
+       }
+
+       @Override
+       public int hash(Tuple2<byte[], StreamRecord<IN>> record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(Tuple2<byte[], StreamRecord<IN>> toCompare) {
+               this.keyReference = toCompare.f0;
+               this.timestampReference = 
toCompare.f1.asRecord().getTimestamp();
+       }
+
+       @Override
+       public boolean equalToReference(Tuple2<byte[], StreamRecord<IN>> 
candidate) {
+               return Arrays.equals(keyReference, candidate.f0) &&
+                       timestampReference == 
candidate.f1.asRecord().getTimestamp();
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<Tuple2<byte[], 
StreamRecord<IN>>> referencedComparator) {
+               byte[] otherKey = ((FixedLengthByteKeyComparator<IN>) 
referencedComparator).keyReference;
+               long otherTimestamp = ((FixedLengthByteKeyComparator<IN>) 
referencedComparator).timestampReference;
+
+               int keyCmp = compare(otherKey, this.keyReference);
+               if (keyCmp != 0) {
+                       return keyCmp;
+               }
+               return Long.compare(otherTimestamp, this.timestampReference);
+       }
+
+       @Override
+       public int compare(
+                       Tuple2<byte[], StreamRecord<IN>> first,
+                       Tuple2<byte[], StreamRecord<IN>> second) {
+               int keyCmp = compare(first.f0, second.f0);
+               if (keyCmp != 0) {
+                       return keyCmp;
+               }
+               return Long.compare(first.f1.asRecord().getTimestamp(), 
second.f1.asRecord().getTimestamp());
+       }
+
+       private int compare(byte[] first, byte[] second) {
+               for (int i = 0; i < keyLength; i++) {
+                       int cmp = Byte.compare(first[i], second[i]);
+
+                       if (cmp != 0) {
+                               return cmp < 0 ? -1 : 1;
+                       }
+               }
+
+               return 0;
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int minCount = keyLength;
+               while (minCount-- > 0) {
+                       byte firstValue = firstSource.readByte();
+                       byte secondValue = secondSource.readByte();
+
+                       int cmp = Byte.compare(firstValue, secondValue);
+                       if (cmp != 0) {
+                               return cmp < 0 ? -1 : 1;
+                       }
+               }
+
+               return Long.compare(firstSource.readLong(), 
secondSource.readLong());
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return keyLength + TIMESTAMP_BYTE_SIZE;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(Tuple2<byte[], StreamRecord<IN>> record, 
MemorySegment target, int offset, int numBytes) {
+               BytesKeyNormalizationUtil.putNormalizedKey(record, keyLength, 
target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return false;
+       }
+
+       @Override
+       public TypeComparator<Tuple2<byte[], StreamRecord<IN>>> duplicate() {
+               return new FixedLengthByteKeyComparator<>(this.keyLength);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @Override
+       public TypeComparator<?>[] getFlatComparators() {
+               return new TypeComparator[] {this};
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(
+               Tuple2<byte[], StreamRecord<IN>> record,
+               DataOutputView target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> readWithKeyDenormalization(
+               Tuple2<byte[], StreamRecord<IN>> reuse,
+               DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/KeyAndValueSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/KeyAndValueSerializer.java
new file mode 100644
index 0000000..311da3e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/KeyAndValueSerializer.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A serializer used in {@link SortingDataInput} for serializing elements 
alongside their key and
+ * timestamp. It serializes the record in a format known by the {@link 
FixedLengthByteKeyComparator}
+ * and {@link VariableLengthByteKeyComparator}.
+ *
+ * <p>If the key is of known constant length, the length is not serialized 
with the data.
+ * Therefore the serialized data is as follows:
+ *
+ * <pre>
+ *      [key-length] | &lt;key&gt; | &lt;timestamp&gt; | &lt;record&gt;
+ * </pre>
+ */
+final class KeyAndValueSerializer<IN> extends TypeSerializer<Tuple2<byte[], 
StreamRecord<IN>>> {
+       private static final int TIMESTAMP_LENGTH = 8;
+       private final TypeSerializer<IN> valueSerializer;
+
+       // This represents either a variable length (-1) or a fixed one (>= 0).
+       private final int serializedKeyLength;
+
+       KeyAndValueSerializer(TypeSerializer<IN> valueSerializer, int 
serializedKeyLength) {
+               this.valueSerializer = valueSerializer;
+               this.serializedKeyLength = serializedKeyLength;
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> duplicate() {
+               return new KeyAndValueSerializer<>(valueSerializer.duplicate(), 
this.serializedKeyLength);
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> copy(Tuple2<byte[], 
StreamRecord<IN>> from) {
+               StreamRecord<IN> fromRecord = from.f1;
+               return Tuple2.of(
+                       Arrays.copyOf(from.f0, from.f0.length),
+                       
fromRecord.copy(valueSerializer.copy(fromRecord.getValue()))
+               );
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> createInstance() {
+               return Tuple2.of(new byte[0], new 
StreamRecord<>(valueSerializer.createInstance()));
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> copy(
+                       Tuple2<byte[], StreamRecord<IN>> from,
+                       Tuple2<byte[], StreamRecord<IN>> reuse) {
+               StreamRecord<IN> fromRecord = from.f1;
+               StreamRecord<IN> reuseRecord = reuse.f1;
+
+               IN valueCopy = valueSerializer.copy(fromRecord.getValue(), 
reuseRecord.getValue());
+               fromRecord.copyTo(valueCopy, reuseRecord);
+               reuse.f0 = Arrays.copyOf(from.f0, from.f0.length);
+               reuse.f1 = reuseRecord;
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               if (valueSerializer.getLength() < 0 || serializedKeyLength < 0) 
{
+                       return -1;
+               }
+               return valueSerializer.getLength() + serializedKeyLength + 
TIMESTAMP_LENGTH;
+       }
+
+       @Override
+       public void serialize(Tuple2<byte[], StreamRecord<IN>> record, 
DataOutputView target) throws IOException {
+               if (serializedKeyLength < 0) {
+                       target.writeInt(record.f0.length);
+               }
+               target.write(record.f0);
+               StreamRecord<IN> toSerialize = record.f1;
+               target.writeLong(toSerialize.getTimestamp());
+               valueSerializer.serialize(toSerialize.getValue(), target);
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> deserialize(DataInputView 
source) throws IOException {
+               final int length = getKeyLength(source);
+               byte[] bytes = new byte[length];
+               source.read(bytes);
+               long timestamp = source.readLong();
+               IN value = valueSerializer.deserialize(source);
+               return Tuple2.of(
+                       bytes,
+                       new StreamRecord<>(value, timestamp)
+               );
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> deserialize(Tuple2<byte[], 
StreamRecord<IN>> reuse, DataInputView source) throws IOException {
+               final int length = getKeyLength(source);
+               byte[] bytes = new byte[length];
+               source.read(bytes);
+               long timestamp = source.readLong();
+               IN value = valueSerializer.deserialize(source);
+               StreamRecord<IN> reuseRecord = reuse.f1;
+               reuseRecord.replace(value, timestamp);
+               reuse.f0 = bytes;
+               reuse.f1 = reuseRecord;
+               return reuse;
+       }
+
+       private int getKeyLength(DataInputView source) throws IOException {
+               final int length;
+               if (serializedKeyLength < 0) {
+                       length = source.readInt();
+               } else {
+                       length = serializedKeyLength;
+               }
+               return length;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               final int length;
+               if (serializedKeyLength < 0) {
+                       length = source.readInt();
+                       target.writeInt(length);
+               } else {
+                       length = serializedKeyLength;
+               }
+               for (int i = 0; i < length; i++) {
+                       target.writeByte(source.readByte());
+               }
+               target.writeLong(source.readLong());
+               valueSerializer.copy(source, target);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               KeyAndValueSerializer<?> that = (KeyAndValueSerializer<?>) o;
+               return Objects.equals(valueSerializer, that.valueSerializer);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(valueSerializer);
+       }
+
+       @Override
+       public TypeSerializerSnapshot<Tuple2<byte[], StreamRecord<IN>>> 
snapshotConfiguration() {
+               throw new UnsupportedOperationException(
+                       "The KeyAndValueSerializer should not be used for 
persisting into State!");
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
new file mode 100644
index 0000000..b59a404
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link StreamTaskInput} which sorts in the incoming records from a 
chained input. It postpones
+ * emitting the records until it receives {@link InputStatus#END_OF_INPUT} 
from the chained input.
+ * After it is done it emits a single record at a time from the sorter.
+ *
+ * <p>The sorter uses binary comparison of keys, which are extracted and 
serialized when received
+ * from the chained input. Moreover the timestamps of incoming records are 
used for secondary ordering.
+ * For the comparison it uses either {@link FixedLengthByteKeyComparator} if 
the length of the
+ * serialized key is constant, or {@link VariableLengthByteKeyComparator} 
otherwise.
+ *
+ * <p>Watermarks, stream statuses, nor latency markers are not propagated 
downstream as they do not make
+ * sense with buffered records. The input emits a MAX_WATERMARK after all 
records.
+ *
+ * @param <T> The type of the value in incoming {@link StreamRecord 
StreamRecords}.
+ * @param <K> The type of the key.
+ */
+public final class SortingDataInput<T, K> implements StreamTaskInput<T> {
+
+       private final StreamTaskInput<T> wrappedInput;
+       private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter;
+       private final KeySelector<T, K> keySelector;
+       private final TypeSerializer<K> keySerializer;
+       private final DataOutputSerializer dataOutputSerializer;
+       private final ForwardingDataOutput forwardingDataOutput;
+       private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> 
sortedInput = null;
+       private boolean emittedLast;
+       private long watermarkSeen = Long.MIN_VALUE;
+
+       public SortingDataInput(
+                       StreamTaskInput<T> wrappedInput,
+                       TypeSerializer<T> typeSerializer,
+                       TypeSerializer<K> keySerializer,
+                       KeySelector<T, K> keySelector,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       boolean objectReuse,
+                       double managedMemoryFraction,
+                       Configuration jobConfiguration,
+                       AbstractInvokable containingTask) {
+               try {
+                       this.forwardingDataOutput = new ForwardingDataOutput();
+                       this.keySelector = keySelector;
+                       this.keySerializer = keySerializer;
+                       int keyLength = keySerializer.getLength();
+                       final TypeComparator<Tuple2<byte[], StreamRecord<T>>> 
comparator;
+                       if (keyLength > 0) {
+                               this.dataOutputSerializer = new 
DataOutputSerializer(keyLength);
+                               comparator = new 
FixedLengthByteKeyComparator<>(keyLength);
+                       } else {
+                               this.dataOutputSerializer = new 
DataOutputSerializer(64);
+                               comparator = new 
VariableLengthByteKeyComparator<>();
+                       }
+                       KeyAndValueSerializer<T> keyAndValueSerializer = new 
KeyAndValueSerializer<>(typeSerializer, keyLength);
+                       this.wrappedInput = wrappedInput;
+                       this.sorter = ExternalSorter.newBuilder(
+                                       memoryManager,
+                                       containingTask,
+                                       keyAndValueSerializer,
+                                       comparator)
+                               .memoryFraction(managedMemoryFraction)
+                               .enableSpilling(
+                                       ioManager,
+                                       
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                               
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+                               .objectReuse(objectReuse)
+                               .largeRecords(true)
+                               .build();
+               } catch (MemoryAllocationException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public int getInputIndex() {
+               return wrappedInput.getInputIndex();
+       }
+
+       @Override
+       public CompletableFuture<Void> prepareSnapshot(
+                       ChannelStateWriter channelStateWriter,
+                       long checkpointId) {
+               throw new UnsupportedOperationException("Checkpoints are not 
supported for sorting inputs");
+       }
+
+       @Override
+       public void close() throws IOException {
+               IOException ex = null;
+               try {
+                       wrappedInput.close();
+               } catch (IOException e) {
+                       ex = ExceptionUtils.firstOrSuppressed(e, ex);
+               }
+
+               try {
+                       sorter.close();
+               } catch (IOException e) {
+                       ex = ExceptionUtils.firstOrSuppressed(e, ex);
+               }
+
+               if (ex != null) {
+                       throw ex;
+               }
+       }
+
+       private class ForwardingDataOutput implements DataOutput<T> {
+               @Override
+               public void emitRecord(StreamRecord<T> streamRecord) throws 
Exception {
+                       K key = keySelector.getKey(streamRecord.getValue());
+
+                       keySerializer.serialize(key, dataOutputSerializer);
+                       byte[] serializedKey = 
dataOutputSerializer.getCopyOfBuffer();
+                       dataOutputSerializer.clear();
+
+                       sorter.writeRecord(Tuple2.of(serializedKey, 
streamRecord));
+               }
+
+               @Override
+               public void emitWatermark(Watermark watermark) {
+                       watermarkSeen = Math.max(watermarkSeen, 
watermark.getTimestamp());
+               }
+
+               @Override
+               public void emitStreamStatus(StreamStatus streamStatus) {
+
+               }
+
+               @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+
+               }
+       }
+
+       @Override
+       public InputStatus emitNext(DataOutput<T> output) throws Exception {
+               if (sortedInput != null) {
+                       return emitNextSortedRecord(output);
+               }
+
+               InputStatus inputStatus = 
wrappedInput.emitNext(forwardingDataOutput);
+               if (inputStatus == InputStatus.END_OF_INPUT) {
+                       endSorting();
+                       return emitNextSortedRecord(output);
+               }
+
+               return inputStatus;
+       }
+
+       @Nonnull
+       private InputStatus emitNextSortedRecord(DataOutput<T> output) throws 
Exception {
+               if (emittedLast) {
+                       return InputStatus.END_OF_INPUT;
+               }
+
+               Tuple2<byte[], StreamRecord<T>> next = sortedInput.next();
+               if (next != null) {
+                       output.emitRecord(next.f1);
+                       return InputStatus.MORE_AVAILABLE;
+               } else {
+                       emittedLast = true;
+                       if (watermarkSeen > Long.MIN_VALUE) {
+                               output.emitWatermark(new 
Watermark(watermarkSeen));
+                       }
+                       return InputStatus.END_OF_INPUT;
+               }
+       }
+
+       private void endSorting() throws Exception {
+               this.sorter.finishReading();
+               this.sortedInput = sorter.getIterator();
+       }
+
+       @Override
+       public CompletableFuture<?> getAvailableFuture() {
+               if (sortedInput != null) {
+                       return AvailabilityProvider.AVAILABLE;
+               } else {
+                       return wrappedInput.getAvailableFuture();
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/VariableLengthByteKeyComparator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/VariableLengthByteKeyComparator.java
new file mode 100644
index 0000000..9b60506
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/VariableLengthByteKeyComparator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A comparator used in {@link SortingDataInput} which compares records keys 
and timestamps,.
+ * It uses binary format produced by the {@link KeyAndValueSerializer}.
+ *
+ * <p>It assumes keys are of a variable length and thus expects the length of 
the record to be serialized.
+ */
+final class VariableLengthByteKeyComparator<IN> extends 
TypeComparator<Tuple2<byte[], StreamRecord<IN>>> {
+       private byte[] keyReference;
+       private long timestampReference;
+
+       @Override
+       public int hash(Tuple2<byte[], StreamRecord<IN>> record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(Tuple2<byte[], StreamRecord<IN>> toCompare) {
+               this.keyReference = Arrays.copyOf(toCompare.f0, 
toCompare.f0.length);
+               this.timestampReference = 
toCompare.f1.asRecord().getTimestamp();
+       }
+
+       @Override
+       public boolean equalToReference(Tuple2<byte[], StreamRecord<IN>> 
candidate) {
+               return Arrays.equals(keyReference, candidate.f0) &&
+                       timestampReference == 
candidate.f1.asRecord().getTimestamp();
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<Tuple2<byte[], 
StreamRecord<IN>>> referencedComparator) {
+               byte[] otherKey = ((VariableLengthByteKeyComparator<IN>) 
referencedComparator).keyReference;
+               long otherTimestamp = ((VariableLengthByteKeyComparator<IN>) 
referencedComparator).timestampReference;
+
+               int keyCmp = compare(otherKey, this.keyReference);
+               if (keyCmp != 0) {
+                       return keyCmp;
+               }
+               return Long.compare(otherTimestamp, this.timestampReference);
+       }
+
+       @Override
+       public int compare(
+                       Tuple2<byte[], StreamRecord<IN>> first,
+                       Tuple2<byte[], StreamRecord<IN>> second) {
+               int keyCmp = compare(first.f0, second.f0);
+               if (keyCmp != 0) {
+                       return keyCmp;
+               }
+               return Long.compare(first.f1.asRecord().getTimestamp(), 
second.f1.asRecord().getTimestamp());
+       }
+
+       private int compare(byte[] first, byte[] second) {
+               int firstLength = first.length;
+               int secondLength = second.length;
+               int minLength = Math.min(firstLength, secondLength);
+               for (int i = 0; i < minLength; i++) {
+                       int cmp = Byte.compare(first[i], second[i]);
+
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               return Integer.compare(firstLength, secondLength);
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int firstLength = firstSource.readInt();
+               int secondLength = secondSource.readInt();
+               int minLength = Math.min(firstLength, secondLength);
+               while (minLength-- > 0) {
+                       byte firstValue = firstSource.readByte();
+                       byte secondValue = secondSource.readByte();
+
+                       int cmp = Byte.compare(firstValue, secondValue);
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               int lengthCompare = Integer.compare(firstLength, secondLength);
+               if (lengthCompare != 0) {
+                       return lengthCompare;
+               } else {
+                       return Long.compare(firstSource.readLong(), 
secondSource.readLong());
+               }
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return Integer.MAX_VALUE;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return true;
+       }
+
+       @Override
+       public void putNormalizedKey(Tuple2<byte[], StreamRecord<IN>> record, 
MemorySegment target, int offset, int numBytes) {
+               BytesKeyNormalizationUtil.putNormalizedKey(record, 
record.f0.length, target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return false;
+       }
+
+       @Override
+       public TypeComparator<Tuple2<byte[], StreamRecord<IN>>> duplicate() {
+               return new VariableLengthByteKeyComparator<>();
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @Override
+       public TypeComparator<?>[] getFlatComparators() {
+               return new TypeComparator[] {this};
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(
+               Tuple2<byte[], StreamRecord<IN>> record,
+               DataOutputView target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Tuple2<byte[], StreamRecord<IN>> readWithKeyDenormalization(
+               Tuple2<byte[], StreamRecord<IN>> reuse,
+               DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparatorTest.java
new file mode 100644
index 0000000..6fe0fd3
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FixedLengthByteKeyComparator}.
+ */
+public class FixedLengthByteKeyComparatorTest extends 
ComparatorTestBase<Tuple2<byte[], StreamRecord<Integer>>> {
+       @Override
+       protected Order[] getTestedOrder() {
+               return new Order[]{Order.ASCENDING};
+       }
+
+       @Override
+       protected TypeComparator<Tuple2<byte[], StreamRecord<Integer>>> 
createComparator(boolean ascending) {
+               return new FixedLengthByteKeyComparator<>(
+                       new IntSerializer().getLength()
+               );
+       }
+
+       @Override
+       protected TypeSerializer<Tuple2<byte[], StreamRecord<Integer>>> 
createSerializer() {
+               IntSerializer intSerializer = new IntSerializer();
+               return new KeyAndValueSerializer<>(
+                       intSerializer,
+                       intSerializer.getLength()
+               );
+       }
+
+       @Override
+       protected void deepEquals(
+                       String message,
+                       Tuple2<byte[], StreamRecord<Integer>> should,
+                       Tuple2<byte[], StreamRecord<Integer>> is) {
+               assertThat(message, should.f0, equalTo(is.f0));
+               assertThat(message, should.f1, equalTo(is.f1));
+       }
+
+       @Override
+       protected Tuple2<byte[], StreamRecord<Integer>>[] getSortedTestData() {
+               return SerializerComparatorTestData.getOrderedIntTestData();
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/FixedLengthKeyAndValueSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/FixedLengthKeyAndValueSerializerTest.java
new file mode 100644
index 0000000..50322b4
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/FixedLengthKeyAndValueSerializerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Test;
+
+/**
+ * Tests for {@link KeyAndValueSerializer}, which verify fixed length keys.
+ */
+public class FixedLengthKeyAndValueSerializerTest extends 
SerializerTestBase<Tuple2<byte[], StreamRecord<Integer>>> {
+
+       private static final int INTEGER_SIZE = 4;
+       private static final int TIMESTAMP_SIZE = 8;
+
+       @Override
+       protected TypeSerializer<Tuple2<byte[], StreamRecord<Integer>>> 
createSerializer() {
+               IntSerializer intSerializer = new IntSerializer();
+               return new KeyAndValueSerializer<>(
+                       intSerializer,
+                       intSerializer.getLength()
+               );
+       }
+
+       @Override
+       protected int getLength() {
+               return INTEGER_SIZE + TIMESTAMP_SIZE + INTEGER_SIZE;
+       }
+
+       @Override
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       protected Class<Tuple2<byte[], StreamRecord<Integer>>> getTypeClass() {
+               return (Class<Tuple2<byte[], StreamRecord<Integer>>>) (Class) 
Tuple2.class;
+       }
+
+       @Override
+       protected Tuple2<byte[], StreamRecord<Integer>>[] getTestData() {
+               return SerializerComparatorTestData.getOrderedIntTestData();
+       }
+
+       @Override
+       @Test(expected = UnsupportedOperationException.class)
+       public void testConfigSnapshotInstantiation() {
+               super.testConfigSnapshotInstantiation();
+       }
+
+       @Override
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSnapshotConfigurationAndReconfigure() throws Exception {
+               super.testSnapshotConfigurationAndReconfigure();
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SerializerComparatorTestData.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SerializerComparatorTestData.java
new file mode 100644
index 0000000..954d55f
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SerializerComparatorTestData.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Test data for {@link VariableLengthByteKeyComparatorTest}, {@link 
FixedLengthByteKeyComparatorTest},
+ * {@link FixedLengthKeyAndValueSerializerTest}, and {@link 
VariableLengthKeyAndValueSerializerTest}.
+ */
+final class SerializerComparatorTestData {
+       @SuppressWarnings("unchecked")
+       static Tuple2<byte[], StreamRecord<Integer>>[] getOrderedIntTestData() {
+               IntSerializer intSerializer = new IntSerializer();
+               DataOutputSerializer outputSerializer = new 
DataOutputSerializer(intSerializer.getLength());
+
+               return IntStream.range(-10, 10)
+                       .mapToObj(
+                               idx -> {
+                                       try {
+                                               intSerializer.serialize(idx, 
outputSerializer);
+                                               byte[] copyOfBuffer = 
outputSerializer.getCopyOfBuffer();
+                                               outputSerializer.clear();
+                                               return Tuple2.of(copyOfBuffer, 
new StreamRecord<>(idx, idx));
+                                       } catch (IOException e) {
+                                               throw new AssertionError(e);
+                                       }
+                               }
+                       ).toArray(Tuple2[]::new);
+       }
+
+       @SuppressWarnings("unchecked")
+       static Tuple2<byte[], StreamRecord<String>>[] 
getOrderedStringTestData() {
+               StringSerializer stringSerializer = new StringSerializer();
+               DataOutputSerializer outputSerializer = new 
DataOutputSerializer(64);
+               return Stream.of(
+                       new String(new byte[] {-1, 0}),
+                       new String(new byte[] {0, 1}),
+                       "A",
+                       "AB",
+                       "ABC",
+                       "ABCD",
+                       "ABCDE",
+                       "ABCDEF",
+                       "ABCDEFG",
+                       "ABCDEFGH")
+                       .map(
+                               str -> {
+                                       try {
+                                               stringSerializer.serialize(str, 
outputSerializer);
+                                               byte[] copyOfBuffer = 
outputSerializer.getCopyOfBuffer();
+                                               outputSerializer.clear();
+                                               return Tuple2.of(copyOfBuffer, 
new StreamRecord<>(str, 0));
+                                       } catch (IOException e) {
+                                               throw new AssertionError(e);
+                                       }
+                               }
+                       ).sorted(
+                               (o1, o2) -> {
+                                       byte[] key0 = o1.f0;
+                                       byte[] key1 = o2.f0;
+
+                                       int firstLength = key0.length;
+                                       int secondLength = key1.length;
+                                       int minLength = Math.min(firstLength, 
secondLength);
+                                       for (int i = 0; i < minLength; i++) {
+                                               int cmp = Byte.compare(key0[i], 
key1[i]);
+
+                                               if (cmp != 0) {
+                                                       return cmp;
+                                               }
+                                       }
+
+                                       int lengthCmp = 
Integer.compare(firstLength, secondLength);
+                                       if (lengthCmp != 0) {
+                                               return lengthCmp;
+                                       }
+                                       return 
Long.compare(o1.f1.getTimestamp(), o2.f1.getTimestamp());
+                               }
+                       ).toArray(Tuple2[]::new);
+       }
+
+       private SerializerComparatorTestData() {
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingDataInputITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingDataInputITCase.java
new file mode 100644
index 0000000..1e5b738
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingDataInputITCase.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Longer running IT tests for {@link SortingDataInputTest}. For quicker smoke 
tests see {@link SortingDataInputTest}.
+ */
+public class SortingDataInputITCase {
+       @Test
+       public void intKeySorting() throws Exception {
+               int numberOfRecords = 1_000_000;
+               GeneratedRecordsDataInput input = new 
GeneratedRecordsDataInput(numberOfRecords);
+               KeySelector<Tuple3<Integer, String, byte[]>, Integer> 
keySelector = value -> value.f0;
+               try (
+                       MockEnvironment environment = 
MockEnvironment.builder().build();
+                       SortingDataInput<Tuple3<Integer, String, byte[]>, 
Integer> sortingDataInput = new SortingDataInput<>(
+                       input,
+                       GeneratedRecordsDataInput.SERIALIZER,
+                       new IntSerializer(),
+                       keySelector,
+                       environment.getMemoryManager(),
+                       environment.getIOManager(),
+                       true,
+                       1.0,
+                       new Configuration(),
+                       new DummyInvokable()
+               )) {
+                       InputStatus inputStatus;
+                       VerifyingOutput<Integer> output = new 
VerifyingOutput<>(keySelector);
+                       do {
+                               inputStatus = sortingDataInput.emitNext(output);
+                       } while (inputStatus != InputStatus.END_OF_INPUT);
+
+                       assertThat(output.getSeenRecords(), 
equalTo(numberOfRecords));
+               }
+       }
+
+       @Test
+       public void stringKeySorting() throws Exception {
+               int numberOfRecords = 1_000_000;
+               GeneratedRecordsDataInput input = new 
GeneratedRecordsDataInput(numberOfRecords);
+               KeySelector<Tuple3<Integer, String, byte[]>, String> 
keySelector = value -> value.f1;
+               try (
+                       MockEnvironment environment = 
MockEnvironment.builder().build();
+                       SortingDataInput<Tuple3<Integer, String, byte[]>, 
String> sortingDataInput = new SortingDataInput<>(
+                       input,
+                       GeneratedRecordsDataInput.SERIALIZER,
+                       new StringSerializer(),
+                       keySelector,
+                       environment.getMemoryManager(),
+                       environment.getIOManager(),
+                       true,
+                       1.0,
+                       new Configuration(),
+                       new DummyInvokable()
+               )) {
+                       InputStatus inputStatus;
+                       VerifyingOutput<String> output = new 
VerifyingOutput<>(keySelector);
+                       do {
+                               inputStatus = sortingDataInput.emitNext(output);
+                       } while (inputStatus != InputStatus.END_OF_INPUT);
+
+                       assertThat(output.getSeenRecords(), 
equalTo(numberOfRecords));
+               }
+       }
+
+       /**
+        * The idea of the tests here is to check that the keys are grouped 
together. Therefore there should not be a
+        * situation were we see a key different from the key of the previous 
record, but one that we've seen before.
+        *
+        * <p>This output verifies that invariant.
+        */
+       private static final class VerifyingOutput<E> implements 
PushingAsyncDataInput.DataOutput<Tuple3<Integer, String, byte[]>> {
+
+               private final KeySelector<Tuple3<Integer, String, byte[]>, E> 
keySelector;
+               private final Set<E> seenKeys = new LinkedHashSet<>();
+               private E currentKey = null;
+               private int seenRecords = 0;
+
+               private VerifyingOutput(KeySelector<Tuple3<Integer, String, 
byte[]>, E> keySelector) {
+                       this.keySelector = keySelector;
+               }
+
+               @Override
+               public void emitRecord(StreamRecord<Tuple3<Integer, String, 
byte[]>> streamRecord) throws Exception {
+                       this.seenRecords++;
+                       E incomingKey = 
keySelector.getKey(streamRecord.getValue());
+                       if (!Objects.equals(incomingKey, currentKey)) {
+                               if (!seenKeys.add(incomingKey)) {
+                                       Assert.fail("Received an out of order 
key: " + incomingKey);
+                               }
+                               this.currentKey = incomingKey;
+                       }
+               }
+
+               @Override
+               public void emitWatermark(Watermark watermark) throws Exception 
{
+
+               }
+
+               @Override
+               public void emitStreamStatus(StreamStatus streamStatus) throws 
Exception {
+
+               }
+
+               @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) 
throws Exception {
+
+               }
+
+               public int getSeenRecords() {
+                       return seenRecords;
+               }
+       }
+
+       private static final class GeneratedRecordsDataInput
+               implements StreamTaskInput<Tuple3<Integer, String, byte[]>> {
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               private static final TypeSerializer<Tuple3<Integer, String, 
byte[]>> SERIALIZER = new TupleSerializer<>(
+                       (Class<Tuple3<Integer, String, byte[]>>) (Class) 
Tuple3.class,
+                       new TypeSerializer[]{
+                               new IntSerializer(),
+                               new StringSerializer(),
+                               new BytePrimitiveArraySerializer()
+                       });
+
+               private static final String ALPHA_NUM = 
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+               private final long numberOfRecords;
+               private int recordsGenerated;
+               private final Random rnd = new Random();
+               private final byte[] buffer;
+
+               private GeneratedRecordsDataInput(int numberOfRecords) {
+                       this.numberOfRecords = numberOfRecords;
+                       this.recordsGenerated = 0;
+                       this.buffer = new byte[500];
+                       rnd.nextBytes(buffer);
+               }
+
+               @Override
+               public InputStatus emitNext(DataOutput<Tuple3<Integer, String, 
byte[]>> output) throws Exception {
+                       if (recordsGenerated >= numberOfRecords) {
+                               return InputStatus.END_OF_INPUT;
+                       }
+
+                       output.emitRecord(
+                               new StreamRecord<>(
+                                       Tuple3.of(
+                                               rnd.nextInt(),
+                                               randomString(rnd.nextInt(256)),
+                                               buffer
+                                       ),
+                                       1
+                               )
+                       );
+                       if (recordsGenerated++ >= numberOfRecords) {
+                               return InputStatus.END_OF_INPUT;
+                       } else {
+                               return InputStatus.MORE_AVAILABLE;
+                       }
+               }
+
+               @Override
+               public CompletableFuture<?> getAvailableFuture() {
+                       return AvailabilityProvider.AVAILABLE;
+               }
+
+               private String randomString(int len) {
+                       StringBuilder sb = new StringBuilder(len);
+                       for (int i = 0; i < len; i++) {
+                               
sb.append(ALPHA_NUM.charAt(rnd.nextInt(ALPHA_NUM.length())));
+                       }
+                       return sb.toString();
+               }
+
+               @Override
+               public int getInputIndex() {
+                       return 0;
+               }
+
+               @Override
+               public CompletableFuture<Void> prepareSnapshot(
+                               ChannelStateWriter channelStateWriter,
+                               long checkpointId) throws IOException {
+                       return CompletableFuture.completedFuture(null);
+               }
+
+               @Override
+               public void close() throws IOException {
+
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingDataInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingDataInputTest.java
new file mode 100644
index 0000000..2712e8a
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingDataInputTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SortingDataInput}.
+ *
+ * <p>These are rather simple unit tests. See also {@link 
SortingDataInputITCase} for more thorough tests.
+ */
+public class SortingDataInputTest {
+       @Test
+       public void simpleFixedLengthKeySorting() throws Exception {
+               CollectingDataOutput<Integer> collectingDataOutput = new 
CollectingDataOutput<>();
+               CollectionDataInput<Integer> input = new CollectionDataInput<>(
+                       Arrays.asList(
+                               new StreamRecord<>(1, 3),
+                               new StreamRecord<>(1, 1),
+                               new StreamRecord<>(2, 1),
+                               new StreamRecord<>(2, 3),
+                               new StreamRecord<>(1, 2),
+                               new StreamRecord<>(2, 2)
+                       )
+               );
+               MockEnvironment environment = MockEnvironment.builder().build();
+               SortingDataInput<Integer, Integer> sortingDataInput = new 
SortingDataInput<>(
+                       input,
+                       new IntSerializer(),
+                       new IntSerializer(),
+                       (KeySelector<Integer, Integer>) value -> value,
+                       environment.getMemoryManager(),
+                       environment.getIOManager(),
+                       true,
+                       1.0,
+                       new Configuration(),
+                       new DummyInvokable()
+               );
+
+               InputStatus inputStatus;
+               do {
+                       inputStatus = 
sortingDataInput.emitNext(collectingDataOutput);
+               } while (inputStatus != InputStatus.END_OF_INPUT);
+
+               assertThat(collectingDataOutput.events, equalTo(
+                       Arrays.asList(
+                               new StreamRecord<>(1, 1),
+                               new StreamRecord<>(1, 2),
+                               new StreamRecord<>(1, 3),
+                               new StreamRecord<>(2, 1),
+                               new StreamRecord<>(2, 2),
+                               new StreamRecord<>(2, 3)
+                       )
+               ));
+       }
+
+       @Test
+       public void watermarkPropagation() throws Exception {
+               CollectingDataOutput<Integer> collectingDataOutput = new 
CollectingDataOutput<>();
+               CollectionDataInput<Integer> input = new CollectionDataInput<>(
+                       Arrays.asList(
+                               new StreamRecord<>(1, 3),
+                               new Watermark(1),
+                               new StreamRecord<>(1, 1),
+                               new Watermark(2),
+                               new StreamRecord<>(2, 1),
+                               new Watermark(3),
+                               new StreamRecord<>(2, 3),
+                               new Watermark(4),
+                               new StreamRecord<>(1, 2),
+                               new Watermark(5),
+                               new StreamRecord<>(2, 2),
+                               new Watermark(6)
+                       )
+               );
+               MockEnvironment environment = MockEnvironment.builder().build();
+               SortingDataInput<Integer, Integer> sortingDataInput = new 
SortingDataInput<>(
+                       input,
+                       new IntSerializer(),
+                       new IntSerializer(),
+                       (KeySelector<Integer, Integer>) value -> value,
+                       environment.getMemoryManager(),
+                       environment.getIOManager(),
+                       true,
+                       1.0,
+                       new Configuration(),
+                       new DummyInvokable()
+               );
+
+               InputStatus inputStatus;
+               do {
+                       inputStatus = 
sortingDataInput.emitNext(collectingDataOutput);
+               } while (inputStatus != InputStatus.END_OF_INPUT);
+
+               assertThat(collectingDataOutput.events, equalTo(
+                       Arrays.asList(
+                               new StreamRecord<>(1, 1),
+                               new StreamRecord<>(1, 2),
+                               new StreamRecord<>(1, 3),
+                               new StreamRecord<>(2, 1),
+                               new StreamRecord<>(2, 2),
+                               new StreamRecord<>(2, 3),
+                               new Watermark(6)
+                       )
+               ));
+       }
+
+       @Test
+       public void simpleVariableLengthKeySorting() throws Exception {
+               CollectingDataOutput<Integer> collectingDataOutput = new 
CollectingDataOutput<>();
+               CollectionDataInput<Integer> input = new CollectionDataInput<>(
+                       Arrays.asList(
+                               new StreamRecord<>(1, 3),
+                               new StreamRecord<>(1, 1),
+                               new StreamRecord<>(2, 1),
+                               new StreamRecord<>(2, 3),
+                               new StreamRecord<>(1, 2),
+                               new StreamRecord<>(2, 2)
+                       )
+               );
+               MockEnvironment environment = MockEnvironment.builder().build();
+               SortingDataInput<Integer, String> sortingDataInput = new 
SortingDataInput<>(
+                       input,
+                       new IntSerializer(),
+                       new StringSerializer(),
+                       (KeySelector<Integer, String>) value -> "" + value,
+                       environment.getMemoryManager(),
+                       environment.getIOManager(),
+                       true,
+                       1.0,
+                       new Configuration(),
+                       new DummyInvokable()
+               );
+
+               InputStatus inputStatus;
+               do {
+                       inputStatus = 
sortingDataInput.emitNext(collectingDataOutput);
+               } while (inputStatus != InputStatus.END_OF_INPUT);
+
+               assertThat(collectingDataOutput.events, equalTo(
+                       Arrays.asList(
+                               new StreamRecord<>(1, 1),
+                               new StreamRecord<>(1, 2),
+                               new StreamRecord<>(1, 3),
+                               new StreamRecord<>(2, 1),
+                               new StreamRecord<>(2, 2),
+                               new StreamRecord<>(2, 3)
+                       )
+               ));
+       }
+
+       private static final class CollectionDataInput<E> implements 
StreamTaskInput<E> {
+
+               private final Iterator<StreamElement> elementsIterator;
+
+               private CollectionDataInput(Collection<StreamElement> elements) 
{
+                       this.elementsIterator = elements.iterator();
+               }
+
+               @Override
+               public InputStatus emitNext(DataOutput<E> output) throws 
Exception {
+                       if (elementsIterator.hasNext()) {
+                               StreamElement streamElement = 
elementsIterator.next();
+                               if (streamElement instanceof StreamRecord) {
+                                       
output.emitRecord(streamElement.asRecord());
+                               } else if (streamElement instanceof Watermark) {
+                                       
output.emitWatermark(streamElement.asWatermark());
+                               } else {
+                                       throw new 
IllegalStateException("Unsupported element type: " + streamElement);
+                               }
+                       }
+                       return elementsIterator.hasNext() ? 
InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT;
+               }
+
+               @Override
+               public CompletableFuture<?> getAvailableFuture() {
+                       return CompletableFuture.completedFuture(null);
+               }
+
+               @Override
+               public int getInputIndex() {
+                       return 0;
+               }
+
+               @Override
+               public CompletableFuture<Void> prepareSnapshot(
+                               ChannelStateWriter channelStateWriter,
+                               long checkpointId) throws IOException {
+                       return null;
+               }
+
+               @Override
+               public void close() throws IOException {
+
+               }
+       }
+
+       /**
+        * A test utility implementation of {@link 
PushingAsyncDataInput.DataOutput} that collects all events.
+        */
+       private static final class CollectingDataOutput<E> implements 
PushingAsyncDataInput.DataOutput<E> {
+
+               final List<Object> events = new ArrayList<>();
+
+               @Override
+               public void emitWatermark(Watermark watermark) throws Exception 
{
+                       events.add(watermark);
+               }
+
+               @Override
+               public void emitStreamStatus(StreamStatus streamStatus) throws 
Exception {
+                       events.add(streamStatus);
+               }
+
+               @Override
+               public void emitRecord(StreamRecord<E> streamRecord) throws 
Exception {
+                       events.add(streamRecord);
+               }
+
+               @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) 
throws Exception {
+                       events.add(latencyMarker);
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/VariableLengthByteKeyComparatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/VariableLengthByteKeyComparatorTest.java
new file mode 100644
index 0000000..05848a2
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/VariableLengthByteKeyComparatorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link VariableLengthByteKeyComparator}.
+ */
+public class VariableLengthByteKeyComparatorTest extends 
ComparatorTestBase<Tuple2<byte[], StreamRecord<String>>> {
+       @Override
+       protected Order[] getTestedOrder() {
+               return new Order[]{Order.ASCENDING};
+       }
+
+       @Override
+       protected TypeComparator<Tuple2<byte[], StreamRecord<String>>> 
createComparator(boolean ascending) {
+               return new VariableLengthByteKeyComparator<>();
+       }
+
+       @Override
+       protected TypeSerializer<Tuple2<byte[], StreamRecord<String>>> 
createSerializer() {
+               StringSerializer stringSerializer = new StringSerializer();
+               return new KeyAndValueSerializer<>(
+                       stringSerializer,
+                       stringSerializer.getLength()
+               );
+       }
+
+       @Override
+       protected void deepEquals(
+                       String message,
+                       Tuple2<byte[], StreamRecord<String>> should,
+                       Tuple2<byte[], StreamRecord<String>> is) {
+               assertThat(message, should.f0, equalTo(is.f0));
+               assertThat(message, should.f1, equalTo(is.f1));
+       }
+
+       @Override
+       protected Tuple2<byte[], StreamRecord<String>>[] getSortedTestData() {
+               return SerializerComparatorTestData.getOrderedStringTestData();
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/VariableLengthKeyAndValueSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/VariableLengthKeyAndValueSerializerTest.java
new file mode 100644
index 0000000..1758657
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/VariableLengthKeyAndValueSerializerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Test;
+
+/**
+ * Tests for {@link KeyAndValueSerializer}, which verify variable length keys.
+ */
+public class VariableLengthKeyAndValueSerializerTest extends 
SerializerTestBase<Tuple2<byte[], StreamRecord<String>>> {
+
+       @Override
+       protected TypeSerializer<Tuple2<byte[], StreamRecord<String>>> 
createSerializer() {
+               StringSerializer stringSerializer = new StringSerializer();
+               return new KeyAndValueSerializer<>(
+                       stringSerializer,
+                       stringSerializer.getLength()
+               );
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       protected Class<Tuple2<byte[], StreamRecord<String>>> getTypeClass() {
+               return (Class<Tuple2<byte[], StreamRecord<String>>>) (Class) 
Tuple2.class;
+       }
+
+       @Override
+       protected Tuple2<byte[], StreamRecord<String>>[] getTestData() {
+               return SerializerComparatorTestData.getOrderedStringTestData();
+       }
+
+       @Override
+       @Test(expected = UnsupportedOperationException.class)
+       public void testConfigSnapshotInstantiation() {
+               super.testConfigSnapshotInstantiation();
+       }
+
+       @Override
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSnapshotConfigurationAndReconfigure() throws Exception {
+               super.testSnapshotConfigurationAndReconfigure();
+       }
+}

Reply via email to