This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 4e365e3  MINOR: add log indicating the suppression time (#6260)
4e365e3 is described below

commit 4e365e3fd490c8055a83db34beab2ad6978dd8fc
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Sun Feb 17 12:10:29 2019 -0600

    MINOR: add log indicating the suppression time (#6260)
    
    Reviewer: Bill Bejeck <b...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../streams/kstream/internals/KTableImpl.java      | 24 +++++++++++++------
 .../suppress/FinalResultsSuppressionBuilder.java   |  7 +++++-
 .../internals/suppress/NamedSuppressed.java        | 28 ++++++++++++++++++++++
 .../internals/suppress/SuppressedInternal.java     |  3 ++-
 4 files changed, 53 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index d4c1baf..6e65b89 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
@@ -40,10 +39,13 @@ import 
org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
+import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -60,6 +62,7 @@ import static 
org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchU
  * @param <V> the value type
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements 
KTable<K, V> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KTableImpl.class);
 
     static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
@@ -342,10 +345,15 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
     @Override
     public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
-        final SuppressedInternal<K> suppressedInternal = 
buildSuppress(suppressed);
+        final String name;
+        if (suppressed instanceof NamedSuppressed) {
+            final String givenName = ((NamedSuppressed) suppressed).name();
+            name = givenName != null ? givenName : 
builder.newProcessorName(SUPPRESS_NAME);
+        } else {
+            throw new IllegalArgumentException("Custom subclasses of 
Suppressed are not supported.");
+        }
 
-        final String name =
-            suppressedInternal.name() != null ? suppressedInternal.name() : 
builder.newProcessorName(SUPPRESS_NAME);
+        final SuppressedInternal<K> suppressedInternal = 
buildSuppress(suppressed, name);
 
         final String storeName =
             suppressedInternal.name() != null ? suppressedInternal.name() + 
"-store" : builder.newStoreName(SUPPRESS_NAME);
@@ -381,13 +389,15 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
     }
 
     @SuppressWarnings("unchecked")
-    private SuppressedInternal<K> buildSuppress(final Suppressed<? super K> 
suppress) {
+    private SuppressedInternal<K> buildSuppress(final Suppressed<? super K> 
suppress, final String name) {
         if (suppress instanceof FinalResultsSuppressionBuilder) {
             final long grace = findAndVerifyWindowGrace(streamsGraphNode);
+            LOG.info("Using grace period of [{}] as the suppress duration for 
node [{}].",
+                     Duration.ofMillis(grace), name);
 
-            final FinalResultsSuppressionBuilder<?> builder = 
(FinalResultsSuppressionBuilder) suppress;
+            final FinalResultsSuppressionBuilder<?> builder = 
(FinalResultsSuppressionBuilder<?>) suppress;
 
-            final SuppressedInternal<? extends Windowed> 
finalResultsSuppression =
+            final SuppressedInternal<?> finalResultsSuppression =
                 builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
 
             return (SuppressedInternal<K>) finalResultsSuppression;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index af1e0d2..e917556 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import java.time.Duration;
 import java.util.Objects;
 
-public class FinalResultsSuppressionBuilder<K extends Windowed> implements 
Suppressed<K> {
+public class FinalResultsSuppressionBuilder<K extends Windowed> implements 
Suppressed<K>, NamedSuppressed<K> {
     private final String name;
     private final StrictBufferConfig bufferConfig;
 
@@ -60,6 +60,11 @@ public class FinalResultsSuppressionBuilder<K extends 
Windowed> implements Suppr
     }
 
     @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
     public int hashCode() {
         return Objects.hash(name, bufferConfig);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/NamedSuppressed.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/NamedSuppressed.java
new file mode 100644
index 0000000..78f6bd6
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/NamedSuppressed.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+/**
+ * Internally-facing interface to work around the fact that all Suppressed 
config objects
+ * are name-able, but do not present a getter (for consistency with other 
config objects).
+ * If we allow getters on config objects in the future, we can delete this 
interface.
+ */
+public interface NamedSuppressed<K> extends Suppressed<K> {
+    String name();
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index c387700..1540b2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -22,7 +22,7 @@ import 
org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeD
 import java.time.Duration;
 import java.util.Objects;
 
-public class SuppressedInternal<K> implements Suppressed<K> {
+public class SuppressedInternal<K> implements Suppressed<K>, 
NamedSuppressed<K> {
     private static final Duration DEFAULT_SUPPRESSION_TIME = 
Duration.ofMillis(Long.MAX_VALUE);
     private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = 
(StrictBufferConfigImpl) BufferConfig.unbounded();
 
@@ -62,6 +62,7 @@ public class SuppressedInternal<K> implements Suppressed<K> {
         return new SuppressedInternal<>(name, timeToWaitForMoreEvents, 
bufferConfig, timeDefinition, safeToDropTombstones);
     }
 
+    @Override
     public String name() {
         return name;
     }

Reply via email to