This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fa1702f MINOR: Remove deprecated valueTransformer.punctuate (#4993) fa1702f is described below commit fa1702fece04c5fc50149fc9b05d77a459b7180b Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu May 10 09:50:59 2018 -0700 MINOR: Remove deprecated valueTransformer.punctuate (#4993) Also removed the InternalValueTransformerWithKey / Supplier which is used to mock away the deprecated punctuate function. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../kafka/streams/kstream/ValueTransformer.java | 22 ----------- .../streams/kstream/internals/AbstractStream.java | 46 ++-------------------- .../internals/InternalValueTransformerWithKey.java | 24 ----------- .../InternalValueTransformerWithKeySupplier.java | 21 ---------- .../streams/kstream/internals/KStreamImpl.java | 10 ++--- .../kstream/internals/KStreamTransformValues.java | 10 +++-- .../kstream/internals/AbstractStreamTest.java | 35 ++++++++-------- .../internals/KStreamTransformValuesTest.java | 16 ++------ 8 files changed, 34 insertions(+), 150 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 1da779e..c0da38a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; /** @@ -85,27 +84,6 @@ public interface ValueTransformer<V, VR> { VR transform(final V value); /** - * Perform any periodic operations if this processor {@link ProcessorContext#schedule(long) schedule itself} with - * the context during {@link #init(ProcessorContext) initialization}. - * <p> - * It is not possible to return any new output records within {@code punctuate}. - * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)} - * will result in an {@link StreamsException exception}. - * Furthermore, {@code punctuate} must return {@code null}. - * <p> - * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to - * timestamps return by the used {@link TimestampExtractor}) - * and not based on wall-clock time. - * - * @deprecated Please use {@link Punctuator} functional interface instead. - * - * @param timestamp the stream time when {@code punctuate} is being called - * @return must return {@code null}—otherwise, an {@link StreamsException exception} will be thrown - */ - @Deprecated - VR punctuate(final long timestamp); - - /** * Close this processor and clean up any resources. * <p> * It is not possible to return any new output records within {@code close()}. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 497bdac..7bc7a15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; @@ -84,19 +83,13 @@ public abstract class AbstractStream<K> { }; } - static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final ValueTransformerSupplier<V, VR> valueTransformerSupplier) { + static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(final ValueTransformerSupplier<V, VR> valueTransformerSupplier) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return new InternalValueTransformerWithKeySupplier<K, V, VR>() { + return new ValueTransformerWithKeySupplier<K, V, VR>() { @Override - public InternalValueTransformerWithKey<K, V, VR> get() { + public ValueTransformerWithKey<K, V, VR> get() { final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get(); - return new InternalValueTransformerWithKey<K, V, VR>() { - @SuppressWarnings("deprecation") - @Override - public VR punctuate(final long timestamp) { - return valueTransformer.punctuate(timestamp); - } - + return new ValueTransformerWithKey<K, V, VR>() { @Override public void init(final ProcessorContext context) { valueTransformer.init(context); @@ -115,35 +108,4 @@ public abstract class AbstractStream<K> { } }; } - - static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier) { - Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null"); - return new InternalValueTransformerWithKeySupplier<K, V, VR>() { - @Override - public InternalValueTransformerWithKey<K, V, VR> get() { - final ValueTransformerWithKey<K, V, VR> valueTransformerWithKey = valueTransformerWithKeySupplier.get(); - return new InternalValueTransformerWithKey<K, V, VR>() { - @Override - public VR punctuate(final long timestamp) { - throw new StreamsException("ValueTransformerWithKey#punctuate should not be called."); - } - - @Override - public void init(final ProcessorContext context) { - valueTransformerWithKey.init(context); - } - - @Override - public VR transform(final K readOnlyKey, final V value) { - return valueTransformerWithKey.transform(readOnlyKey, value); - } - - @Override - public void close() { - valueTransformerWithKey.close(); - } - }; - } - }; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java deleted file mode 100644 index 636e409..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; - -public interface InternalValueTransformerWithKey<K, V, VR> extends ValueTransformerWithKey<K, V, VR> { - VR punctuate(final long timestamp); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java deleted file mode 100644 index 3418e71..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -public interface InternalValueTransformerWithKeySupplier<K, V, VR> { - InternalValueTransformerWithKey<K, V, VR> get(); -} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 2ddd5ff..b8195a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -348,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final String... stateStoreNames) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null"); - return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames); + return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames); } @Override @@ -356,13 +356,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final String... stateStoreNames) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null"); - return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames); + return doTransformValues(valueTransformerSupplier, stateStoreNames); } - private <VR> KStream<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> internalValueTransformerWithKeySupplier, - final String... stateStoreNames) { + private <VR> KStream<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, + final String... stateStoreNames) { final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); - builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(internalValueTransformerWithKeySupplier), this.name); + builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerWithKeySupplier), this.name); if (stateStoreNames != null && stateStoreNames.length > 0) { builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index fb6af34..d45b7cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -35,9 +37,9 @@ import java.util.Map; public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> { - private final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier; + private final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier; - public KStreamTransformValues(final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) { + KStreamTransformValues(final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) { this.valueTransformerSupplier = valueTransformerSupplier; } @@ -48,10 +50,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> { - private final InternalValueTransformerWithKey<K, V, R> valueTransformer; + private final ValueTransformerWithKey<K, V, R> valueTransformer; private ProcessorContext context; - public KStreamTransformValuesProcessor(final InternalValueTransformerWithKey<K, V, R> valueTransformer) { + KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) { this.valueTransformer = valueTransformer; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index 1f9bcba..a37b6f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -47,29 +47,26 @@ public class AbstractStreamTest { @Test public void testToInternlValueTransformerSupplierSuppliesNewTransformers() { - final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class); - expect(vts.get()).andReturn(null).times(3); - final InternalValueTransformerWithKeySupplier ivtwks = - AbstractStream.toInternalValueTransformerSupplier(vts); - replay(vts); - ivtwks.get(); - ivtwks.get(); - ivtwks.get(); - verify(vts); + final ValueTransformerSupplier valueTransformerSupplier = createMock(ValueTransformerSupplier.class); + expect(valueTransformerSupplier.get()).andReturn(null).times(3); + final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = + AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier); + replay(valueTransformerSupplier); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + verify(valueTransformerSupplier); } @Test public void testToInternalValueTransformerSupplierSuppliesNewTransformers() { - final ValueTransformerWithKeySupplier vtwks = - createMock(ValueTransformerWithKeySupplier.class); - expect(vtwks.get()).andReturn(null).times(3); - final InternalValueTransformerWithKeySupplier ivtwks = - AbstractStream.toInternalValueTransformerSupplier(vtwks); - replay(vtwks); - ivtwks.get(); - ivtwks.get(); - ivtwks.get(); - verify(vtwks); + final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = createMock(ValueTransformerWithKeySupplier.class); + expect(valueTransformerWithKeySupplier.get()).andReturn(null).times(3); + replay(valueTransformerWithKeySupplier); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + verify(valueTransformerWithKeySupplier); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index 419e6f1..807fb1f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -69,11 +69,6 @@ public class KStreamTransformValuesTest { } @Override - public Integer punctuate(long timestamp) { - return null; - } - - @Override public void close() { } }; @@ -143,15 +138,10 @@ public class KStreamTransformValuesTest { @Test public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() { final BadValueTransformer badValueTransformer = new BadValueTransformer(); - final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new InternalValueTransformerWithKeySupplier<Integer, Integer, Integer>() { + final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new ValueTransformerWithKeySupplier<Integer, Integer, Integer>() { @Override - public InternalValueTransformerWithKey<Integer, Integer, Integer> get() { - return new InternalValueTransformerWithKey<Integer, Integer, Integer>() { - @Override - public Integer punctuate(long timestamp) { - throw new StreamsException("ValueTransformerWithKey#punctuate should not be called."); - } - + public ValueTransformerWithKey<Integer, Integer, Integer> get() { + return new ValueTransformerWithKey<Integer, Integer, Integer>() { @Override public void init(final ProcessorContext context) { badValueTransformer.init(context); -- To stop receiving notification emails like this one, please contact guozh...@apache.org.