[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635064#comment-16635064
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

mjsax closed pull request #5693: KAFKA-7223: In-Memory Suppression Buffering
URL: https://github.com/apache/kafka/pull/5693
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 49fe96ba20c..6db7a708541 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -19,7 +19,7 @@
 import 
org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import 
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
-import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 
 import java.time.Duration;
 
@@ -155,6 +155,6 @@ static StrictBufferConfig unbounded() {
      * @return a suppression configuration
      */
     static <K> Suppressed<K> untilTimeLimit(final Duration 
timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
-        return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, 
null, false);
+        return new SuppressedInternal<>(timeToWaitForMoreEvents, bufferConfig, 
null, false);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 8a2e619b7b5..9bb83733c82 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.ByteBufferDeserializer;
-import org.apache.kafka.common.serialization.ByteBufferSerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -30,6 +28,17 @@
 public class FullChangeSerde<T> implements Serde<Change<T>> {
     private final Serde<T> inner;
 
+    @SuppressWarnings("unchecked")
+    public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) {
+        if (serde == null) {
+            return null;
+        } else if (serde instanceof FullChangeSerde) {
+            return (FullChangeSerde<T>) serde;
+        } else {
+            return new FullChangeSerde<T>((Serde<T>) serde);
+        }
+    }
+
     public FullChangeSerde(final Serde<T> inner) {
         this.inner = requireNonNull(inner);
     }
@@ -47,7 +56,6 @@ public void close() {
     @Override
     public Serializer<Change<T>> serializer() {
         final Serializer<T> innerSerializer = inner.serializer();
-        final ByteBufferSerializer byteBufferSerializer = new 
ByteBufferSerializer();
 
         return new Serializer<Change<T>>() {
             @Override
@@ -65,8 +73,8 @@ public void configure(final Map<String, ?> configs, final 
boolean isKey) {
                 final byte[] newBytes = data.newValue == null ? null : 
innerSerializer.serialize(topic, data.newValue);
                 final int newSize = newBytes == null ? -1 : newBytes.length;
 
-                final ByteBuffer buffer = ByteBuffer.allocate(
-                    4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 
: newSize)
+                final ByteBuffer buffer = ByteBuffer.wrap(
+                    new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize 
== -1 ? 0 : newSize)]
                 );
                 buffer.putInt(oldSize);
                 if (oldBytes != null) {
@@ -76,7 +84,7 @@ public void configure(final Map<String, ?> configs, final 
boolean isKey) {
                 if (newBytes != null) {
                     buffer.put(newBytes);
                 }
-                return byteBufferSerializer.serialize(null, buffer);
+                return buffer.array();
             }
 
             @Override
@@ -89,7 +97,6 @@ public void close() {
     @Override
     public Deserializer<Change<T>> deserializer() {
         final Deserializer<T> innerDeserializer = inner.deserializer();
-        final ByteBufferDeserializer byteBufferDeserializer = new 
ByteBufferDeserializer();
         return new Deserializer<Change<T>>() {
             @Override
             public void configure(final Map<String, ?> configs, final boolean 
isKey) {
@@ -101,7 +108,7 @@ public void configure(final Map<String, ?> configs, final 
boolean isKey) {
                 if (data == null) {
                     return null;
                 }
-                final ByteBuffer buffer = 
byteBufferDeserializer.deserialize(null, data);
+                final ByteBuffer buffer = ByteBuffer.wrap(data);
 
                 final int oldSize = buffer.getInt();
                 final byte[] oldBytes = oldSize == -1 ? null : new 
byte[oldSize];
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java
new file mode 100644
index 00000000000..a69002f9900
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+class FullTimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
+    FullTimeWindowedSerde(final Serde<T> inner, final long windowSize) {
+        super(
+            new TimeWindowedSerializer<>(inner.serializer()),
+            new TimeWindowedDeserializer<>(inner.deserializer(), windowSize)
+        );
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c5b29702c7c..9c76766d9c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -39,7 +39,7 @@
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
-import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -356,12 +356,11 @@ public String queryableStoreName() {
     public KTable<K, V> suppress(final Suppressed<K> suppressed) {
         final String name = builder.newProcessorName(SUPPRESS_NAME);
 
-        // TODO: follow-up pr to forward the k/v serdes
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
             () -> new KTableSuppressProcessor<>(
                 buildSuppress(suppressed),
-                null,
-                null
+                keySerde,
+                valSerde == null ? null : new FullChangeSerde<>(valSerde)
             );
 
         final ProcessorParameters<K, Change<V>> processorParameters = new 
ProcessorParameters<>(
@@ -387,18 +386,18 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    private SuppressedImpl<K> buildSuppress(final Suppressed<K> suppress) {
+    private SuppressedInternal<K> buildSuppress(final Suppressed<K> suppress) {
         if (suppress instanceof FinalResultsSuppressionBuilder) {
             final long grace = findAndVerifyWindowGrace(streamsGraphNode);
 
             final FinalResultsSuppressionBuilder<?> builder = 
(FinalResultsSuppressionBuilder) suppress;
 
-            final SuppressedImpl<? extends Windowed> finalResultsSuppression =
+            final SuppressedInternal<? extends Windowed> 
finalResultsSuppression =
                 builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
 
-            return (SuppressedImpl<K>) finalResultsSuppression;
-        } else if (suppress instanceof SuppressedImpl) {
-            return (SuppressedImpl<K>) suppress;
+            return (SuppressedInternal<K>) finalResultsSuppression;
+        } else if (suppress instanceof SuppressedInternal) {
+            return (SuppressedInternal<K>) suppress;
         } else {
             throw new IllegalArgumentException("Custom subclasses of 
Suppressed are not allowed.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 2ee8f7c5958..753ef0f85a8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -27,7 +27,6 @@
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -93,7 +92,7 @@
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator),
             materializedInternal.isQueryable(),
-            materializedInternal.keySerde() != null ? new 
WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
 
@@ -120,7 +119,7 @@
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), initializer, aggregator),
             materializedInternal.isQueryable(),
-            materializedInternal.keySerde() != null ? new 
WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
 
@@ -149,7 +148,7 @@
             materialize(materializedInternal),
             new KStreamWindowReduce<>(windows, 
materializedInternal.storeName(), reducer),
             materializedInternal.isQueryable(),
-            materializedInternal.keySerde() != null ? new 
WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
similarity index 84%
rename from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
rename to 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index e731dc6f5e1..67d3783867c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -20,8 +20,8 @@
 
 import static 
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
 
-abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> 
implements Suppressed.BufferConfig<BC> {
-    public abstract long maxKeys();
+abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> 
implements Suppressed.BufferConfig<BC> {
+    public abstract long maxRecords();
 
     public abstract long maxBytes();
 
@@ -39,12 +39,12 @@
 
     @Override
     public Suppressed.StrictBufferConfig shutDownWhenFull() {
-        return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN);
+        return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN);
     }
 
     @Override
     public Suppressed.BufferConfig emitEarlyWhenFull() {
-        return new EagerBufferConfigImpl(maxKeys(), maxBytes());
+        return new EagerBufferConfigImpl(maxRecords(), maxBytes());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 0c2c883e18a..161f934f3a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -20,13 +20,13 @@
 
 import java.util.Objects;
 
-public class EagerBufferConfigImpl extends BufferConfigImpl {
+public class EagerBufferConfigImpl extends BufferConfigInternal {
 
-    private final long maxKeys;
+    private final long maxRecords;
     private final long maxBytes;
 
-    public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) {
-        this.maxKeys = maxKeys;
+    public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
+        this.maxRecords = maxRecords;
         this.maxBytes = maxBytes;
     }
 
@@ -37,12 +37,12 @@ public EagerBufferConfigImpl(final long maxKeys, final long 
maxBytes) {
 
     @Override
     public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
-        return new EagerBufferConfigImpl(maxKeys, byteLimit);
+        return new EagerBufferConfigImpl(maxRecords, byteLimit);
     }
 
     @Override
-    public long maxKeys() {
-        return maxKeys;
+    public long maxRecords() {
+        return maxRecords;
     }
 
     @Override
@@ -60,17 +60,17 @@ public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
-        return maxKeys == that.maxKeys &&
+        return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxKeys, maxBytes);
+        return Objects.hash(maxRecords, maxBytes);
     }
 
     @Override
     public String toString() {
-        return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + 
maxBytes + '}';
+        return "EagerBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + 
maxBytes + '}';
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index db09307d48c..523ae0602c6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -18,7 +18,6 @@
 
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -30,11 +29,11 @@ public FinalResultsSuppressionBuilder(final 
Suppressed.StrictBufferConfig buffer
         this.bufferConfig = bufferConfig;
     }
 
-    public SuppressedImpl<K> buildFinalResultsSuppression(final Duration 
gracePeriod) {
-        return new SuppressedImpl<>(
+    public SuppressedInternal<K> buildFinalResultsSuppression(final Duration 
gracePeriod) {
+        return new SuppressedInternal<>(
             gracePeriod,
             bufferConfig,
-            (ProcessorContext context, K key) -> key.window().end(),
+            TimeDefinitions.WindowEndTimeDefinition.instance(),
             true
         );
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
new file mode 100644
index 00000000000..677a662f79d
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.internals.ContextualRecord;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+    private final Map<Bytes, TimeKey> index = new HashMap<>();
+    private final TreeMap<TimeKey, ContextualRecord> sortedMap = new 
TreeMap<>();
+    private long memBufferSize = 0L;
+    private long minTimestamp = Long.MAX_VALUE;
+
+    @Override
+    public void evictWhile(final Supplier<Boolean> predicate,
+                           final Consumer<KeyValue<Bytes, ContextualRecord>> 
callback) {
+        final Iterator<Map.Entry<TimeKey, ContextualRecord>> delegate = 
sortedMap.entrySet().iterator();
+
+        if (predicate.get()) {
+            Map.Entry<TimeKey, ContextualRecord> next = null;
+            if (delegate.hasNext()) {
+                next = delegate.next();
+            }
+
+            // predicate being true means we read one record, call the 
callback, and then remove it
+            while (next != null && predicate.get()) {
+                callback.accept(new KeyValue<>(next.getKey().key(), 
next.getValue()));
+
+                delegate.remove();
+                index.remove(next.getKey().key());
+
+                memBufferSize = memBufferSize - 
computeRecordSize(next.getKey().key(), next.getValue());
+
+                // peek at the next record so we can update the minTimestamp
+                if (delegate.hasNext()) {
+                    next = delegate.next();
+                    minTimestamp = next == null ? Long.MAX_VALUE : 
next.getKey().time();
+                } else {
+                    next = null;
+                    minTimestamp = Long.MAX_VALUE;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void put(final long time,
+                    final Bytes key,
+                    final ContextualRecord value) {
+        // non-resetting semantics:
+        // if there was a previous version of the same record,
+        // then insert the new record in the same place in the priority queue
+
+        final TimeKey previousKey = index.get(key);
+        if (previousKey == null) {
+            final TimeKey nextKey = new TimeKey(time, key);
+            index.put(key, nextKey);
+            sortedMap.put(nextKey, value);
+            minTimestamp = Math.min(minTimestamp, time);
+            memBufferSize = memBufferSize + computeRecordSize(key, value);
+        } else {
+            final ContextualRecord removedValue = sortedMap.put(previousKey, 
value);
+            memBufferSize =
+                memBufferSize
+                    + computeRecordSize(key, value)
+                    - (removedValue == null ? 0 : computeRecordSize(key, 
removedValue));
+        }
+    }
+
+    @Override
+    public int numRecords() {
+        return index.size();
+    }
+
+    @Override
+    public long bufferSize() {
+        return memBufferSize;
+    }
+
+    @Override
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    private long computeRecordSize(final Bytes key, final ContextualRecord 
value) {
+        long size = 0L;
+        size += 8; // buffer time
+        size += key.get().length;
+        if (value != null) {
+            size += value.sizeBytes();
+        }
+        return size;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 6f0021fbc49..57e5066d09e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -17,70 +17,117 @@
 package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import 
org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-
-import java.time.Duration;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.internals.ContextualRecord;
 
 import static java.util.Objects.requireNonNull;
 
 public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
-    private final SuppressedImpl<K> suppress;
+    private final long maxRecords;
+    private final long maxBytes;
+    private final long suppressDurationMillis;
+    private final TimeOrderedKeyValueBuffer buffer;
+    private final TimeDefinition<K> bufferTimeDefinition;
+    private final BufferFullStrategy bufferFullStrategy;
+    private final boolean shouldSuppressTombstones;
     private InternalProcessorContext internalProcessorContext;
 
-    private final Serde<K> keySerde;
-    private final Serde<Change<V>> valueSerde;
+    private Serde<K> keySerde;
+    private Serde<Change<V>> valueSerde;
 
-    public KTableSuppressProcessor(final SuppressedImpl<K> suppress,
+    public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
                                    final Serde<K> keySerde,
-                                   final Serde<Change<V>> valueSerde) {
-        this.suppress = requireNonNull(suppress);
+                                   final FullChangeSerde<V> valueSerde) {
+        requireNonNull(suppress);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
+        maxRecords = suppress.getBufferConfig().maxRecords();
+        maxBytes = suppress.getBufferConfig().maxBytes();
+        suppressDurationMillis = 
suppress.getTimeToWaitForMoreEvents().toMillis();
+        buffer = new InMemoryTimeOrderedKeyValueBuffer();
+        bufferTimeDefinition = suppress.getTimeDefinition();
+        bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy();
+        shouldSuppressTombstones = suppress.shouldSuppressTombstones();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context) {
         internalProcessorContext = (InternalProcessorContext) context;
+        this.keySerde = keySerde == null ? (Serde<K>) context.keySerde() : 
keySerde;
+        this.valueSerde = valueSerde == null ? 
FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
     }
 
     @Override
     public void process(final K key, final Change<V> value) {
-        if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && 
definedRecordTime(key) <= internalProcessorContext.streamTime()) {
-            if (shouldForward(value)) {
-                internalProcessorContext.forward(key, value);
-            } // else skip
-        } else {
-            throw new NotImplementedException();
-        }
+        buffer(key, value);
+        enforceConstraints();
     }
 
-    private boolean shouldForward(final Change<V> value) {
-        return !(value.newValue == null && suppress.suppressTombstones());
-    }
+    private void buffer(final K key, final Change<V> value) {
+        final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext, key);
+        final ProcessorRecordContext recordContext = 
internalProcessorContext.recordContext();
 
-    private long definedRecordTime(final K key) {
-        return suppress.getTimeDefinition().time(internalProcessorContext, 
key);
+        final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(null, key));
+        final byte[] serializedValue = valueSerde.serializer().serialize(null, 
value);
+
+        buffer.put(bufferTime, serializedKey, new 
ContextualRecord(serializedValue, recordContext));
     }
 
-    @Override
-    public void close() {
+    private void enforceConstraints() {
+        final long streamTime = internalProcessorContext.streamTime();
+        final long expiryTime = streamTime - suppressDurationMillis;
+
+        buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, 
this::emit);
+
+        if (overCapacity()) {
+            switch (bufferFullStrategy) {
+                case EMIT:
+                    buffer.evictWhile(this::overCapacity, this::emit);
+                    return;
+                case SHUT_DOWN:
+                    throw new StreamsException(String.format(
+                        "%s buffer exceeded its max capacity. Currently 
[%d/%d] records and [%d/%d] bytes.",
+                        internalProcessorContext.currentNode().name(),
+                        buffer.numRecords(), maxRecords,
+                        buffer.bufferSize(), maxBytes
+                    ));
+            }
+        }
     }
 
-    @Override
-    public String toString() {
-        return "KTableSuppressProcessor{" +
-            "suppress=" + suppress +
-            ", keySerde=" + keySerde +
-            ", valueSerde=" + valueSerde +
-            '}';
+    private boolean overCapacity() {
+        return buffer.numRecords() > maxRecords || buffer.bufferSize() > 
maxBytes;
     }
 
-    public static class NotImplementedException extends RuntimeException {
-        NotImplementedException() {
-            super();
+    private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
+        final Change<V> value = valueSerde.deserializer().deserialize(null, 
toEmit.value.value());
+        if (shouldForward(value)) {
+            final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
+            
internalProcessorContext.setRecordContext(toEmit.value.recordContext());
+            try {
+                final K key = keySerde.deserializer().deserialize(null, 
toEmit.key.get());
+                internalProcessorContext.forward(key, value);
+            } finally {
+                internalProcessorContext.setRecordContext(prevRecordContext);
+            }
         }
     }
+
+    private boolean shouldForward(final Change<V> value) {
+        return !(value.newValue == null && shouldSuppressTombstones);
+    }
+
+    @Override
+    public void close() {
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 0634a748a5b..ef754ec6fc2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -22,22 +22,22 @@
 
 import static 
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
 
-public class StrictBufferConfigImpl extends 
BufferConfigImpl<Suppressed.StrictBufferConfig> implements 
Suppressed.StrictBufferConfig {
+public class StrictBufferConfigImpl extends 
BufferConfigInternal<Suppressed.StrictBufferConfig> implements 
Suppressed.StrictBufferConfig {
 
-    private final long maxKeys;
+    private final long maxRecords;
     private final long maxBytes;
     private final BufferFullStrategy bufferFullStrategy;
 
-    public StrictBufferConfigImpl(final long maxKeys,
+    public StrictBufferConfigImpl(final long maxRecords,
                                   final long maxBytes,
                                   final BufferFullStrategy bufferFullStrategy) 
{
-        this.maxKeys = maxKeys;
+        this.maxRecords = maxRecords;
         this.maxBytes = maxBytes;
         this.bufferFullStrategy = bufferFullStrategy;
     }
 
     public StrictBufferConfigImpl() {
-        this.maxKeys = Long.MAX_VALUE;
+        this.maxRecords = Long.MAX_VALUE;
         this.maxBytes = Long.MAX_VALUE;
         this.bufferFullStrategy = SHUT_DOWN;
     }
@@ -49,12 +49,12 @@ public StrictBufferConfigImpl() {
 
     @Override
     public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
-        return new StrictBufferConfigImpl(maxKeys, byteLimit, 
bufferFullStrategy);
+        return new StrictBufferConfigImpl(maxRecords, byteLimit, 
bufferFullStrategy);
     }
 
     @Override
-    public long maxKeys() {
-        return maxKeys;
+    public long maxRecords() {
+        return maxRecords;
     }
 
     @Override
@@ -72,19 +72,19 @@ public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
-        return maxKeys == that.maxKeys &&
+        return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes &&
             bufferFullStrategy == that.bufferFullStrategy;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxKeys, maxBytes, bufferFullStrategy);
+        return Objects.hash(maxRecords, maxBytes, bufferFullStrategy);
     }
 
     @Override
     public String toString() {
-        return "StrictBufferConfigImpl{maxKeys=" + maxKeys +
+        return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
             ", maxBytes=" + maxBytes +
             ", bufferFullStrategy=" + bufferFullStrategy + '}';
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
similarity index 71%
rename from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
rename to 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index a3bf2db63a2..99245dae544 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -17,32 +17,32 @@
 package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.streams.kstream.Suppressed;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
 
 import java.time.Duration;
 import java.util.Objects;
 
-public class SuppressedImpl<K> implements Suppressed<K> {
+public class SuppressedInternal<K> implements Suppressed<K> {
     private static final Duration DEFAULT_SUPPRESSION_TIME = 
Duration.ofMillis(Long.MAX_VALUE);
-    private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = 
BufferConfig.unbounded();
+    private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = 
(StrictBufferConfigImpl) BufferConfig.unbounded();
 
-    private final BufferConfig bufferConfig;
+    private final BufferConfigInternal bufferConfig;
     private final Duration timeToWaitForMoreEvents;
     private final TimeDefinition<K> timeDefinition;
     private final boolean suppressTombstones;
 
-    public SuppressedImpl(final Duration suppressionTime,
-                          final BufferConfig bufferConfig,
-                          final TimeDefinition<K> timeDefinition,
-                          final boolean suppressTombstones) {
+    public SuppressedInternal(final Duration suppressionTime,
+                              final BufferConfig bufferConfig,
+                              final TimeDefinition<K> timeDefinition,
+                              final boolean suppressTombstones) {
         this.timeToWaitForMoreEvents = suppressionTime == null ? 
DEFAULT_SUPPRESSION_TIME : suppressionTime;
-        this.timeDefinition = timeDefinition == null ? (context, anyKey) -> 
context.timestamp() : timeDefinition;
-        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
bufferConfig;
+        this.timeDefinition = timeDefinition == null ? 
TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition;
+        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
(BufferConfigInternal) bufferConfig;
         this.suppressTombstones = suppressTombstones;
     }
 
-    interface TimeDefinition<K> {
-        long time(final ProcessorContext context, final K key);
+    BufferConfigInternal getBufferConfig() {
+        return bufferConfig;
     }
 
     TimeDefinition<K> getTimeDefinition() {
@@ -53,11 +53,15 @@ Duration getTimeToWaitForMoreEvents() {
         return timeToWaitForMoreEvents == null ? Duration.ZERO : 
timeToWaitForMoreEvents;
     }
 
+    boolean shouldSuppressTombstones() {
+        return suppressTombstones;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        final SuppressedImpl<?> that = (SuppressedImpl<?>) o;
+        final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
         return Objects.equals(bufferConfig, that.bufferConfig) &&
             Objects.equals(getTimeToWaitForMoreEvents(), 
that.getTimeToWaitForMoreEvents()) &&
             Objects.equals(getTimeDefinition(), that.getTimeDefinition());
@@ -70,14 +74,10 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        return "SuppressedImpl{" +
+        return "SuppressedInternal{" +
             ", bufferConfig=" + bufferConfig +
             ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
             ", timeDefinition=" + timeDefinition +
             '}';
     }
-
-    boolean suppressTombstones() {
-        return suppressTombstones;
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
new file mode 100644
index 00000000000..b37bcf663fd
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+final class TimeDefinitions {
+    private TimeDefinitions() {}
+
+    enum TimeDefinitionType {
+        RECORD_TIME, WINDOW_END_TIME;
+    }
+
+    /**
+     * This interface should never be instantiated outside of this class.
+     */
+    interface TimeDefinition<K> {
+        long time(final ProcessorContext context, final K key);
+
+        TimeDefinitionType type();
+    }
+
+    public static class RecordTimeDefintion<K> implements TimeDefinition<K> {
+        private static final RecordTimeDefintion INSTANCE = new 
RecordTimeDefintion();
+
+        private RecordTimeDefintion() {}
+
+        @SuppressWarnings("unchecked")
+        public static <K> RecordTimeDefintion<K> instance() {
+            return RecordTimeDefintion.INSTANCE;
+        }
+
+        @Override
+        public long time(final ProcessorContext context, final K key) {
+            return context.timestamp();
+        }
+
+        @Override
+        public TimeDefinitionType type() {
+            return TimeDefinitionType.RECORD_TIME;
+        }
+    }
+
+    public static class WindowEndTimeDefinition<K extends Windowed> implements 
TimeDefinition<K> {
+        private static final WindowEndTimeDefinition INSTANCE = new 
WindowEndTimeDefinition();
+
+        private WindowEndTimeDefinition() {}
+
+        @SuppressWarnings("unchecked")
+        public static <K extends Windowed> WindowEndTimeDefinition<K> 
instance() {
+            return WindowEndTimeDefinition.INSTANCE;
+        }
+
+        @Override
+        public long time(final ProcessorContext context, final K key) {
+            return key.window().end();
+        }
+
+        @Override
+        public TimeDefinitionType type() {
+            return TimeDefinitionType.WINDOW_END_TIME;
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
new file mode 100644
index 00000000000..d3ad350686a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Objects;
+
+class TimeKey implements Comparable<TimeKey> {
+    private final long time;
+    private final Bytes key;
+
+    TimeKey(final long time, final Bytes key) {
+        this.time = time;
+        this.key = key;
+    }
+
+    Bytes key() {
+        return key;
+    }
+
+    long time() {
+        return time;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final TimeKey timeKey = (TimeKey) o;
+        return time == timeKey.time &&
+            Objects.equals(key, timeKey.key);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(time, key);
+    }
+
+    @Override
+    public int compareTo(final TimeKey o) {
+        // ordering of keys within a time uses hashCode.
+        final int timeComparison = Long.compare(time, o.time);
+        return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
new file mode 100644
index 00000000000..98a4f63c83f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.internals.ContextualRecord;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+interface TimeOrderedKeyValueBuffer {
+    void evictWhile(final Supplier<Boolean> predicate, final 
Consumer<KeyValue<Bytes, ContextualRecord>> callback);
+
+    void put(final long time, final Bytes key, final ContextualRecord value);
+
+    int numRecords();
+
+    long bufferSize();
+
+    long minTimestamp();
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index dd572649765..cd4657bdcd8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.RecordContext;
 
@@ -78,6 +79,23 @@ public Headers headers() {
         return headers;
     }
 
+    public long sizeBytes() {
+        long size = 0L;
+        size += 8; // value.context.timestamp
+        size += 8; // value.context.offset
+        if (topic != null) {
+            size += topic.toCharArray().length;
+        }
+        size += 4; // partition
+        if (headers != null) {
+            for (final Header header : headers) {
+                size += header.key().toCharArray().length;
+                size += header.value().length;
+            }
+        }
+        return size;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index c016f640cb2..a6a24ea098f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -89,7 +89,7 @@ public void apply(final List<ThreadCache.DirtyEntry> entries) 
{
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final 
InternalProcessorContext context) {
         final ProcessorRecordContext current = context.recordContext();
         try {
-            context.setRecordContext(entry.recordContext());
+            context.setRecordContext(entry.entry().context());
             if (flushListener != null) {
                 V oldValue = null;
                 if (sendOldValues) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 2da5ab98550..cbcb7490efb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -169,7 +169,7 @@ public void put(final Windowed<Bytes> key, final byte[] 
value) {
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final 
InternalProcessorContext context) {
         final Bytes binaryKey = cacheFunction.key(entry.key());
         final ProcessorRecordContext current = context.recordContext();
-        context.setRecordContext(entry.recordContext());
+        context.setRecordContext(entry.entry().context());
         try {
             final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), 
serdes.keyDeserializer(), topic);
             final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 688e88962a2..f8d9ad590a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -108,7 +108,7 @@ private void maybeForward(final ThreadCache.DirtyEntry 
entry,
                               final InternalProcessorContext context) {
         if (flushListener != null) {
             final ProcessorRecordContext current = context.recordContext();
-            context.setRecordContext(entry.recordContext());
+            context.setRecordContext(entry.entry().context());
             try {
                 final V oldValue = sendOldValues ? fetchPrevious(key, 
windowedKey.window().start()) : null;
                 flushListener.apply(windowedKey, 
serdes.valueFrom(entry.newValue()), oldValue);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
new file mode 100644
index 00000000000..89935c09d8f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class ContextualRecord {
+    private final byte[] value;
+    private final ProcessorRecordContext recordContext;
+
+    public ContextualRecord(final byte[] value, final ProcessorRecordContext 
recordContext) {
+        this.value = value;
+        this.recordContext = recordContext;
+    }
+
+    public ProcessorRecordContext recordContext() {
+        return recordContext;
+    }
+
+    public byte[] value() {
+        return value;
+    }
+
+    public long sizeBytes() {
+        return (value == null ? 0 : value.length) + recordContext.sizeBytes();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final ContextualRecord that = (ContextualRecord) o;
+        return Arrays.equals(value, that.value) &&
+            Objects.equals(recordContext, that.recordContext);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(value, recordContext);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 0ac0b77dd37..53436358ded 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -19,18 +19,17 @@
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 
-import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A cache entry
  */
-class LRUCacheEntry extends ProcessorRecordContext {
-
-    private final byte[] value;
+class LRUCacheEntry {
+    private final ContextualRecord record;
     private final long sizeBytes;
     private boolean isDirty;
 
+
     LRUCacheEntry(final byte[] value) {
         this(value, null, false, -1, -1, -1, "");
     }
@@ -42,15 +41,16 @@
                   final long timestamp,
                   final int partition,
                   final String topic) {
-        super(timestamp, offset, partition, topic, headers);
-        this.value = value;
+        final ProcessorRecordContext context = new 
ProcessorRecordContext(timestamp, offset, partition, topic, headers);
+
+        this.record = new ContextualRecord(
+            value,
+            context
+        );
+
         this.isDirty = isDirty;
-        this.sizeBytes = (value == null ? 0 : value.length) +
-                1 + // isDirty
-                8 + // timestamp
-                8 + // offset
-                4 + // partition
-                (topic == null ? 0 : topic.length());
+        this.sizeBytes = 1 + // isDirty
+            record.sizeBytes();
     }
 
     void markClean() {
@@ -66,7 +66,11 @@ long size() {
     }
 
     byte[] value() {
-        return value;
+        return record.value();
+    }
+
+    public ProcessorRecordContext context() {
+        return record.recordContext();
     }
 
     @Override
@@ -74,17 +78,13 @@ public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final LRUCacheEntry that = (LRUCacheEntry) o;
-        return timestamp() == that.timestamp() &&
-                offset() == that.offset() &&
-                partition() == that.partition() &&
-                Objects.equals(topic(), that.topic()) &&
-                Objects.equals(headers(), that.headers()) &&
-                Arrays.equals(this.value, that.value()) &&
-                this.isDirty == that.isDirty();
+        return sizeBytes == that.sizeBytes &&
+            isDirty() == that.isDirty() &&
+            Objects.equals(record, that.record);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp(), offset(), topic(), partition(), 
headers(), value, isDirty);
+        return Objects.hash(record, sizeBytes, isDirty());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 27270e6b51d..941b5221524 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 
@@ -332,9 +331,9 @@ public void close() {
     static class DirtyEntry {
         private final Bytes key;
         private final byte[] newValue;
-        private final ProcessorRecordContext recordContext;
+        private final LRUCacheEntry recordContext;
 
-        DirtyEntry(final Bytes key, final byte[] newValue, final 
ProcessorRecordContext recordContext) {
+        DirtyEntry(final Bytes key, final byte[] newValue, final LRUCacheEntry 
recordContext) {
             this.key = key;
             this.newValue = newValue;
             this.recordContext = recordContext;
@@ -348,7 +347,7 @@ public Bytes key() {
             return newValue;
         }
 
-        public ProcessorRecordContext recordContext() {
+        public LRUCacheEntry entry() {
             return recordContext;
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index af91abaf2b1..a9920e3a6f1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -49,7 +49,6 @@
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -75,6 +74,9 @@
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category({IntegrationTest.class})
 public class SuppressionIntegrationTest {
@@ -88,7 +90,6 @@
     private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
     private static final long TIMEOUT_MS = 30_000L;
 
-    @Ignore
     @Test
     public void shouldSuppressIntermediateEventsWithEmitAfter() throws 
InterruptedException {
         final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
@@ -220,10 +221,9 @@ public void 
shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
         }
     }
 
-    @Ignore
     @Test
     public void shouldSuppressIntermediateEventsWithRecordLimit() throws 
InterruptedException {
-        final String testId = "-shouldSuppressIntermediateEventsWithKeyLimit";
+        final String testId = 
"-shouldSuppressIntermediateEventsWithRecordLimit";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
@@ -279,7 +279,46 @@ public void 
shouldSuppressIntermediateEventsWithRecordLimit() throws Interrupted
         }
     }
 
-    @Ignore
+    @Test
+    public void shouldShutdownWhenRecordConstraintIsViolated() throws 
InterruptedException {
+        final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), 
maxRecords(1L).shutDownWhenFull()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            verifyErrorShutdown(driver);
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
     @Test
     public void shouldSuppressIntermediateEventsWithBytesLimit() throws 
InterruptedException {
         final String testId = 
"-shouldSuppressIntermediateEventsWithBytesLimit";
@@ -339,7 +378,47 @@ public void 
shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedE
         }
     }
 
-    @Ignore
+    @Test
+    public void shouldShutdownWhenBytesConstraintIsViolated() throws 
InterruptedException {
+        final String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
+
+        valueCounts
+            // this is a bit brittle, but I happen to know that the entries 
are a little over 100 bytes in size.
+            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), 
maxBytes(200L).shutDownWhenFull()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            verifyErrorShutdown(driver);
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
     @Test
     public void shouldSupportFinalResultsForTimeWindows() throws 
InterruptedException {
         final String testId = "-shouldSupportFinalResultsForTimeWindows";
@@ -479,6 +558,11 @@ private void produceSynchronously(final String topic, 
final List<KeyValueTimesta
         }
     }
 
+    private void verifyErrorShutdown(final KafkaStreams driver) throws 
InterruptedException {
+        waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, 
"Streams didn't shut down.");
+        assertThat(driver.state(), is(KafkaStreams.State.ERROR));
+    }
+
     private void verifyOutput(final String topic, final 
List<KeyValueTimestamp<String, Long>> expected) {
         final List<ConsumerRecord<String, Long>> results;
         try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
index 7650c59759e..fcb5ba8ef3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -19,7 +19,7 @@
 import 
org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import 
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
-import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.junit.Test;
 
 import static java.lang.Long.MAX_VALUE;
@@ -61,31 +61,31 @@ public void 
intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
         assertThat(
             "time alone should be set",
             untilTimeLimit(ofMillis(2), unbounded()),
-            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), unbounded(), null, false))
         );
 
         assertThat(
             "time and unbounded buffer should be set",
             untilTimeLimit(ofMillis(2), unbounded()),
-            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), unbounded(), null, false))
         );
 
         assertThat(
             "time and keys buffer should be set",
             untilTimeLimit(ofMillis(2), maxRecords(2)),
-            is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), maxRecords(2), null, 
false))
         );
 
         assertThat(
             "time and size buffer should be set",
             untilTimeLimit(ofMillis(2), maxBytes(2)),
-            is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), maxBytes(2), null, false))
         );
 
         assertThat(
             "all constraints should be set",
             untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
-            is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 
2L), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), new 
EagerBufferConfigImpl(3L, 2L), null, false))
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index d98a15e093b..222e1d63982 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -32,7 +31,6 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -41,7 +39,6 @@
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
@@ -60,6 +57,7 @@
 import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
@@ -159,7 +157,7 @@ public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
+    @Test
     public void shouldSuppressIntermediateEventsWithTimeLimit() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = builder
@@ -198,11 +196,9 @@ public void 
shouldSuppressIntermediateEventsWithTimeLimit() {
                     new KeyValueTimestamp<>("v1", 1L, 2L)
                 )
             );
-            // note that the current stream time is 2, which causes v1 to age 
out of the buffer, since
-            // it has been buffered since time 0 (even though the current 
version of it in the buffer has timestamp 1)
             verify(
                 drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
-                singletonList(new KeyValueTimestamp<>("v1", 0L, 1L))
+                singletonList(new KeyValueTimestamp<>("v1", 1L, 2L))
             );
             // inserting a dummy "tick" record just to advance stream time
             driver.pipeInput(recordFactory.create("input", "tick", "tick", 
3L));
@@ -225,36 +221,15 @@ public void 
shouldSuppressIntermediateEventsWithTimeLimit() {
                     new KeyValueTimestamp<>("tick", 1L, 4L)
                 )
             );
+            // tick is still buffered, since it was first inserted at time 3, 
and it is only time 4 right now.
             verify(
                 drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
-                singletonList(
-                    new KeyValueTimestamp<>("v1", 1L, 2L)
-                )
-            );
-            driver.pipeInput(recordFactory.create("input", "tick", "tick", 
5L));
-            verify(
-                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
-                asList(
-                    new KeyValueTimestamp<>("tick", 0L, 5L),
-                    new KeyValueTimestamp<>("tick", 1L, 5L)
-                )
-            );
-            // Note that because the punctuate runs before the process call, 
the tick at time 5 causes
-            // the previous tick to age out of the buffer, so at this point, 
we see the prior value emitted
-            // and the new value is still buffered.
-
-            // Also worth noting is that "tick" ages out because it has been 
buffered since time 3, even though
-            // the current timestamp of the buffered record is 4.
-            verify(
-                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
-                singletonList(
-                    new KeyValueTimestamp<>("tick", 1L, 4L)
-                )
+                emptyList()
             );
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
+    @Test
     public void shouldSuppressIntermediateEventsWithRecordLimit() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = builder
@@ -320,7 +295,7 @@ public void 
shouldSuppressIntermediateEventsWithRecordLimit() {
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
+    @Test
     public void shouldSuppressIntermediateEventsWithBytesLimit() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = builder
@@ -351,8 +326,7 @@ public void 
shouldSuppressIntermediateEventsWithBytesLimit() {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
-            final ConsumerRecord<byte[], byte[]> consumerRecord = 
recordFactory.create("input", "k2", "v1", 2L);
-            driver.pipeInput(consumerRecord);
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
             verify(
                 drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
@@ -388,7 +362,7 @@ public void 
shouldSuppressIntermediateEventsWithBytesLimit() {
         }
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void shouldSupportFinalResultsForTimeWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
@@ -442,7 +416,7 @@ public void shouldSupportFinalResultsForTimeWindows() {
         }
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
@@ -501,7 +475,7 @@ public void 
shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
         }
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void shouldSupportFinalResultsForSessionWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index a38d1d58f43..bb7f49ce7a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,15 +16,20 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -38,25 +43,23 @@
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.common.serialization.Serdes.Long;
 import static org.apache.kafka.common.serialization.Serdes.String;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 import static 
org.apache.kafka.streams.kstream.WindowedSerdes.sessionWindowedSerdeFrom;
-import static 
org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("PointlessArithmeticExpression")
 public class KTableSuppressProcessorTest {
     private static final long ARBITRARY_LONG = 5L;
 
-    private static final long ARBITRARY_TIMESTAMP = 1993L;
-
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
-    private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 
100L);
-
     @Test
     public void zeroTimeLimitShouldImmediatelyEmit() {
         final KTableSuppressProcessor<String, Long> processor =
@@ -66,7 +69,7 @@ public void zeroTimeLimitShouldImmediatelyEmit() {
         processor.init(context);
 
         final long timestamp = ARBITRARY_LONG;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
@@ -83,7 +86,7 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor =
             new KTableSuppressProcessor<>(
                 getImpl(untilTimeLimit(ZERO, unbounded())),
-                timeWindowedSerdeFrom(String.class),
+                timeWindowedSerdeFrom(String.class, 100L),
                 new FullChangeSerde<>(Long())
             );
 
@@ -91,9 +94,9 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         processor.init(context);
 
         final long timestamp = ARBITRARY_LONG;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
-        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 
100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
 
@@ -103,7 +106,7 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void intermediateSuppressionShouldBufferAndEmitLater() {
         final KTableSuppressProcessor<String, Long> processor =
             new KTableSuppressProcessor<>(
@@ -117,13 +120,15 @@ public void 
intermediateSuppressionShouldBufferAndEmitLater() {
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
+        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        assertThat(context.scheduledPunctuators(), hasSize(1));
-        context.scheduledPunctuators().get(0).getPunctuator().punctuate(1);
+        context.setRecordMetadata("topic", 0, 1, null, 1L);
+        context.setStreamTime(1L);
+        processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -131,38 +136,49 @@ public void 
intermediateSuppressionShouldBufferAndEmitLater() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-
-    @SuppressWarnings("unchecked")
-    private <K extends Windowed> SuppressedImpl<K> finalResults(final Duration 
grace) {
-        return ((FinalResultsSuppressionBuilder) 
untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
-    }
-
-
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
             finalResults(ofMillis(1L)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 1L),
             new FullChangeSerde<>(Long())
         );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
 
-        final long timestamp = ARBITRARY_TIMESTAMP;
-        context.setRecordMetadata("topic", 0, 0, null, timestamp);
-        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final long windowStart = 99L;
+        final long recordTime = 99L;
+        final long windowEnd = 100L;
+        context.setRecordMetadata("topic", 0, 0, null, recordTime);
+        context.setStreamTime(recordTime);
+        final Windowed<String> key = new Windowed<>("hey", new 
TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        assertThat(context.scheduledPunctuators(), hasSize(1));
-        
context.scheduledPunctuators().get(0).getPunctuator().punctuate(timestamp + 1L);
+        // although the stream time is now 100, we have to wait 1 ms after the 
window *end* before we
+        // emit "hey", so we don't emit yet.
+        final long windowStart2 = 100L;
+        final long recordTime2 = 100L;
+        final long windowEnd2 = 101L;
+        context.setRecordMetadata("topic", 0, 1, null, recordTime2);
+        context.setStreamTime(recordTime2);
+        processor.process(new Windowed<>("dummyKey1", new 
TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
+        assertThat(context.forwarded(), hasSize(0));
+
+        // ok, now it's time to emit "hey"
+        final long windowStart3 = 101L;
+        final long recordTime3 = 101L;
+        final long windowEnd3 = 102L;
+        context.setRecordMetadata("topic", 0, 1, null, recordTime3);
+        context.setStreamTime(recordTime3);
+        processor.process(new Windowed<>("dummyKey2", new 
TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
         assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.timestamp(), is(recordTime));
     }
 
     /**
@@ -170,27 +186,32 @@ public void 
finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
      * it will still buffer events and emit only after the end of the window.
      * As opposed to emitting immediately the way regular suppresion would 
with a time limit of 0.
      */
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
             finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
 
+        // note the record is in the past, but the window end is in the 
future, so we still have to buffer,
+        // even though the grace period is 0.
         final long timestamp = 5L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        final long streamTime = 99L;
         final long windowEnd = 100L;
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setStreamTime(streamTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        assertThat(context.scheduledPunctuators(), hasSize(1));
-        
context.scheduledPunctuators().get(0).getPunctuator().punctuate(windowEnd);
+        context.setRecordMetadata("", 0, 1L, null, windowEnd);
+        context.setStreamTime(windowEnd);
+        processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, 
windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -202,7 +223,7 @@ public void 
finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
     public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
             finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
@@ -210,7 +231,7 @@ public void 
finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
         final Change<Long> value = ARBITRARY_CHANGE;
@@ -226,7 +247,7 @@ public void 
finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
     public void finalResultsShouldSuppressTombstonesForTimeWindows() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
             finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
@@ -234,7 +255,7 @@ public void 
finalResultsShouldSuppressTombstonesForTimeWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -255,7 +276,7 @@ public void 
finalResultsShouldSuppressTombstonesForSessionWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new 
SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -264,12 +285,11 @@ public void 
finalResultsShouldSuppressTombstonesForSessionWindows() {
         assertThat(context.forwarded(), hasSize(0));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void suppressShouldNotSuppressTombstonesForTimeWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<Windowed<String>, Long>(
-            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
-            timeWindowedSerdeFrom(String.class),
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
@@ -277,7 +297,7 @@ public void 
suppressShouldNotSuppressTombstonesForTimeWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 
100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -289,11 +309,10 @@ public void 
suppressShouldNotSuppressTombstonesForTimeWindows() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void suppressShouldNotSuppressTombstonesForSessionWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<Windowed<String>, Long>(
-            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
             sessionWindowedSerdeFrom(String.class),
             new FullChangeSerde<>(Long())
         );
@@ -302,7 +321,7 @@ public void 
suppressShouldNotSuppressTombstonesForSessionWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new 
SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -314,11 +333,61 @@ public void 
suppressShouldNotSuppressTombstonesForSessionWindows() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void suppressShouldNotSuppressTombstonesForKTable() {
-        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<String, Long>(
-            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setStreamTime(timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void suppressShouldEmitWhenOverRecordCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1))),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        processor.process("dummyKey", value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void suppressShouldEmitWhenOverByteCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L))),
             Serdes.String(),
             new FullChangeSerde<>(Long())
         );
@@ -327,18 +396,82 @@ public void 
suppressShouldNotSuppressTombstonesForKTable() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
         context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
 
+        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        processor.process("dummyKey", value);
+
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
         assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+    @Test
+    public void suppressShouldShutDownWhenOverRecordCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), 
maxRecords(1).shutDownWhenFull())),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setCurrentNode(new ProcessorNode("testNode"));
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        context.setRecordMetadata("", 0, 1L, null, timestamp);
+        try {
+            processor.process("dummyKey", value);
+            fail("expected an exception");
+        } catch (final StreamsException e) {
+            assertThat(e.getMessage(), containsString("buffer exceeded its max 
capacity"));
+        }
+    }
+
+    @Test
+    public void suppressShouldShutDownWhenOverByteCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), 
maxBytes(60L).shutDownWhenFull())),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setCurrentNode(new ProcessorNode("testNode"));
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        context.setRecordMetadata("", 0, 1L, null, timestamp);
+        try {
+            processor.process("dummyKey", value);
+            fail("expected an exception");
+        } catch (final StreamsException e) {
+            assertThat(e.getMessage(), containsString("buffer exceeded its max 
capacity"));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K extends Windowed> SuppressedInternal<K> finalResults(final 
Duration grace) {
+        return ((FinalResultsSuppressionBuilder) 
untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
+    }
+
     private static <E> Matcher<Collection<E>> hasSize(final int i) {
         return new BaseMatcher<Collection<E>>() {
             @Override
@@ -359,7 +492,15 @@ public boolean matches(final Object item) {
         };
     }
 
-    private static <K> SuppressedImpl<K> getImpl(final Suppressed<K> 
suppressed) {
-        return (SuppressedImpl<K>) suppressed;
+    private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> 
suppressed) {
+        return (SuppressedInternal<K>) suppressed;
+    }
+
+    private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> 
rawType, final long windowSize) {
+        final Serde<K> kSerde = Serdes.serdeFrom(rawType);
+        return new Serdes.WrapperSerde<>(
+            new TimeWindowedSerializer<>(kSerde.serializer()),
+            new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize)
+        );
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 0fdbdf76b3c..71a6ac21c10 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -190,7 +190,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
 
         assertEquals(2, flushed.size());
         assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key());
-        assertEquals(headers, flushed.get(0).recordContext().headers());
+        assertEquals(headers, flushed.get(0).entry().context().headers());
         assertArrayEquals(new byte[] {10}, flushed.get(0).newValue());
         assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key());
         assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to