Repository: kafka Updated Branches: refs/heads/trunk d691faf98 -> c1d8c3834
KAFKA-3449: Rename filterOut() to filterNot() to achieve better terminology â¦nology Hi all, This is my first contribution and I hope it will be good. The PR is related to this issue: https://issues.apache.org/jira/browse/KAFKA-3449 Thanks a lot, Andrea Author: Andrea Cosentino <[email protected]> Reviewers: Yasuhiro Matsuda, Guozhang Wang Closes #1134 from oscerd/KAFKA-3449 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1d8c383 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1d8c383 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1d8c383 Branch: refs/heads/trunk Commit: c1d8c38345e0a1e04ced143ed07e63fe02ceb8b0 Parents: d691faf Author: Andrea Cosentino <[email protected]> Authored: Fri Mar 25 15:00:45 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Mar 25 15:00:45 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/kafka/streams/kstream/KStream.java | 2 +- .../main/java/org/apache/kafka/streams/kstream/KTable.java | 2 +- .../kafka/streams/kstream/internals/KStreamFilter.java | 8 ++++---- .../apache/kafka/streams/kstream/internals/KStreamImpl.java | 2 +- .../apache/kafka/streams/kstream/internals/KTableFilter.java | 8 ++++---- .../apache/kafka/streams/kstream/internals/KTableImpl.java | 2 +- .../kafka/streams/kstream/internals/KStreamFilterTest.java | 4 ++-- .../kafka/streams/kstream/internals/KStreamImplTest.java | 2 +- .../kafka/streams/kstream/internals/KTableFilterTest.java | 4 ++-- 9 files changed, 17 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index c4188de..2313b8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -43,7 +43,7 @@ public interface KStream<K, V> { * * @param predicate the instance of {@link Predicate} */ - KStream<K, V> filterOut(Predicate<K, V> predicate); + KStream<K, V> filterNot(Predicate<K, V> predicate); /** * Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream. http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 9a2a8a8..30ea882 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -42,7 +42,7 @@ public interface KTable<K, V> { * * @param predicate the instance of {@link Predicate} */ - KTable<K, V> filterOut(Predicate<K, V> predicate); + KTable<K, V> filterNot(Predicate<K, V> predicate); /** * Create a new instance of {@link KTable} by transforming the value of each element in this stream into a new value in the new stream. http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index 0b1f1e0..f5c2fbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -25,11 +25,11 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; class KStreamFilter<K, V> implements ProcessorSupplier<K, V> { private final Predicate<K, V> predicate; - private final boolean filterOut; + private final boolean filterNot; - public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) { + public KStreamFilter(Predicate<K, V> predicate, boolean filterNot) { this.predicate = predicate; - this.filterOut = filterOut; + this.filterNot = filterNot; } @Override @@ -40,7 +40,7 @@ class KStreamFilter<K, V> implements ProcessorSupplier<K, V> { private class KStreamFilterProcessor extends AbstractProcessor<K, V> { @Override public void process(K key, V value) { - if (filterOut ^ predicate.test(key, value)) { + if (filterNot ^ predicate.test(key, value)) { context().forward(key, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 567b06c..5889e07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -106,7 +106,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public KStream<K, V> filterOut(final Predicate<K, V> predicate) { + public KStream<K, V> filterNot(final Predicate<K, V> predicate) { String name = topology.newName(FILTER_NAME); topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name); http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 72f1d88..080fd9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -26,14 +26,14 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { private final KTableImpl<K, ?, V> parent; private final Predicate<K, V> predicate; - private final boolean filterOut; + private final boolean filterNot; private boolean sendOldValues = false; - public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterOut) { + public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterNot) { this.parent = parent; this.predicate = predicate; - this.filterOut = filterOut; + this.filterNot = filterNot; } @Override @@ -64,7 +64,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { private V computeValue(K key, V value) { V newValue = null; - if (value != null && (filterOut ^ predicate.test(key, value))) + if (value != null && (filterNot ^ predicate.test(key, value))) newValue = value; return newValue; http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index ca1e659..fd464a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -111,7 +111,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public KTable<K, V> filterOut(final Predicate<K, V> predicate) { + public KTable<K, V> filterNot(final Predicate<K, V> predicate) { String name = topology.newName(FILTER_NAME); KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true); http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index ecf1115..75465c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -60,7 +60,7 @@ public class KStreamFilterTest { } @Test - public void testFilterOut() { + public void testFilterNot() { KStreamBuilder builder = new KStreamBuilder(); final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; @@ -69,7 +69,7 @@ public class KStreamFilterTest { processor = new MockProcessorSupplier<>(); stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); - stream.filterOut(isMultipleOfThree).process(processor); + stream.filterNot(isMultipleOfThree).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 38182bc..b5c3d47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -52,7 +52,7 @@ public class KStreamImplTest { public boolean test(String key, String value) { return true; } - }).filterOut(new Predicate<String, String>() { + }).filterNot(new Predicate<String, String>() { @Override public boolean test(String key, String value) { return false; http://git-wip-us.apache.org/repos/asf/kafka/blob/c1d8c383/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 5491ea3..78d274e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -53,7 +53,7 @@ public class KTableFilterTest { return (value % 2) == 0; } }); - KTable<String, Integer> table3 = table1.filterOut(new Predicate<String, Integer>() { + KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() { @Override public boolean test(String key, Integer value) { return (value % 2) == 0; @@ -95,7 +95,7 @@ public class KTableFilterTest { return (value % 2) == 0; } }); - KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterOut( + KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot( new Predicate<String, Integer>() { @Override public boolean test(String key, Integer value) {
