This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b328fc7 MINOR: add equals()/hashCode() for Produced/Consumed (#4979) b328fc7 is described below commit b328fc729b79fee6a9d210cf25890068d92943ee Author: dan norwood <dan.norw...@gmail.com> AuthorDate: Tue May 8 16:07:33 2018 -0700 MINOR: add equals()/hashCode() for Produced/Consumed (#4979) Reviewer: Matthias J. Sax <matth...@confluent.io> --- .../java/org/apache/kafka/streams/Consumed.java | 22 ++++++++++++++++++++++ .../org/apache/kafka/streams/kstream/Produced.java | 21 +++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/Consumed.java index 78d6b24..37d30a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Consumed.java +++ b/streams/src/main/java/org/apache/kafka/streams/Consumed.java @@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.TimestampExtractor; +import java.util.Objects; + /** * The {@code Consumed} class is used to define the optional parameters when using {@link StreamsBuilder} to * build instances of {@link KStream}, {@link KTable}, and {@link GlobalKTable}. @@ -174,4 +176,24 @@ public class Consumed<K, V> { this.resetPolicy = resetPolicy; return this; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Consumed<?, ?> consumed = (Consumed<?, ?>) o; + return Objects.equals(keySerde, consumed.keySerde) && + Objects.equals(valueSerde, consumed.valueSerde) && + Objects.equals(timestampExtractor, consumed.timestampExtractor) && + resetPolicy == consumed.resetPolicy; + } + + @Override + public int hashCode() { + return Objects.hash(keySerde, valueSerde, timestampExtractor, resetPolicy); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java index b2513ea..8d2742a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java @@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner; +import java.util.Objects; + /** * This class is used to provide the optional parameters when producing to new topics * using {@link KStream#through(String, Produced)} or {@link KStream#to(String, Produced)}. @@ -154,4 +156,23 @@ public class Produced<K, V> { this.keySerde = keySerde; return this; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Produced<?, ?> produced = (Produced<?, ?>) o; + return Objects.equals(keySerde, produced.keySerde) && + Objects.equals(valueSerde, produced.valueSerde) && + Objects.equals(partitioner, produced.partitioner); + } + + @Override + public int hashCode() { + return Objects.hash(keySerde, valueSerde, partitioner); + } } -- To stop receiving notification emails like this one, please contact mj...@apache.org.