This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 4e365e3 MINOR: add log indicating the suppression time (#6260) 4e365e3 is described below commit 4e365e3fd490c8055a83db34beab2ad6978dd8fc Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Sun Feb 17 12:10:29 2019 -0600 MINOR: add log indicating the suppression time (#6260) Reviewer: Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/kstream/internals/KTableImpl.java | 24 +++++++++++++------ .../suppress/FinalResultsSuppressionBuilder.java | 7 +++++- .../internals/suppress/NamedSuppressed.java | 28 ++++++++++++++++++++++ .../internals/suppress/SuppressedInternal.java | 3 ++- 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index d4c1baf..6e65b89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; @@ -40,10 +39,13 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor; +import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed; import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; @@ -60,6 +62,7 @@ import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchU * @param <V> the value type */ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(KTableImpl.class); static final String SOURCE_NAME = "KTABLE-SOURCE-"; @@ -342,10 +345,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @Override public KTable<K, V> suppress(final Suppressed<? super K> suppressed) { - final SuppressedInternal<K> suppressedInternal = buildSuppress(suppressed); + final String name; + if (suppressed instanceof NamedSuppressed) { + final String givenName = ((NamedSuppressed) suppressed).name(); + name = givenName != null ? givenName : builder.newProcessorName(SUPPRESS_NAME); + } else { + throw new IllegalArgumentException("Custom subclasses of Suppressed are not supported."); + } - final String name = - suppressedInternal.name() != null ? suppressedInternal.name() : builder.newProcessorName(SUPPRESS_NAME); + final SuppressedInternal<K> suppressedInternal = buildSuppress(suppressed, name); final String storeName = suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); @@ -381,13 +389,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< } @SuppressWarnings("unchecked") - private SuppressedInternal<K> buildSuppress(final Suppressed<? super K> suppress) { + private SuppressedInternal<K> buildSuppress(final Suppressed<? super K> suppress, final String name) { if (suppress instanceof FinalResultsSuppressionBuilder) { final long grace = findAndVerifyWindowGrace(streamsGraphNode); + LOG.info("Using grace period of [{}] as the suppress duration for node [{}].", + Duration.ofMillis(grace), name); - final FinalResultsSuppressionBuilder<?> builder = (FinalResultsSuppressionBuilder) suppress; + final FinalResultsSuppressionBuilder<?> builder = (FinalResultsSuppressionBuilder<?>) suppress; - final SuppressedInternal<? extends Windowed> finalResultsSuppression = + final SuppressedInternal<?> finalResultsSuppression = builder.buildFinalResultsSuppression(Duration.ofMillis(grace)); return (SuppressedInternal<K>) finalResultsSuppression; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java index af1e0d2..e917556 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed; import java.time.Duration; import java.util.Objects; -public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppressed<K> { +public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppressed<K>, NamedSuppressed<K> { private final String name; private final StrictBufferConfig bufferConfig; @@ -60,6 +60,11 @@ public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppr } @Override + public String name() { + return name; + } + + @Override public int hashCode() { return Objects.hash(name, bufferConfig); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/NamedSuppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/NamedSuppressed.java new file mode 100644 index 0000000..78f6bd6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/NamedSuppressed.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +/** + * Internally-facing interface to work around the fact that all Suppressed config objects + * are name-able, but do not present a getter (for consistency with other config objects). + * If we allow getters on config objects in the future, we can delete this interface. + */ +public interface NamedSuppressed<K> extends Suppressed<K> { + String name(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index c387700..1540b2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeD import java.time.Duration; import java.util.Objects; -public class SuppressedInternal<K> implements Suppressed<K> { +public class SuppressedInternal<K> implements Suppressed<K>, NamedSuppressed<K> { private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE); private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = (StrictBufferConfigImpl) BufferConfig.unbounded(); @@ -62,6 +62,7 @@ public class SuppressedInternal<K> implements Suppressed<K> { return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, safeToDropTombstones); } + @Override public String name() { return name; }