This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b328fc7  MINOR: add equals()/hashCode() for Produced/Consumed (#4979)
b328fc7 is described below

commit b328fc729b79fee6a9d210cf25890068d92943ee
Author: dan norwood <dan.norw...@gmail.com>
AuthorDate: Tue May 8 16:07:33 2018 -0700

    MINOR: add equals()/hashCode() for Produced/Consumed (#4979)
    
    Reviewer: Matthias J. Sax <matth...@confluent.io>
---
 .../java/org/apache/kafka/streams/Consumed.java    | 22 ++++++++++++++++++++++
 .../org/apache/kafka/streams/kstream/Produced.java | 21 +++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/Consumed.java 
b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
index 78d6b24..37d30a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
+import java.util.Objects;
+
 /**
  * The {@code Consumed} class is used to define the optional parameters when 
using {@link StreamsBuilder} to
  * build instances of {@link KStream}, {@link KTable}, and {@link 
GlobalKTable}.
@@ -174,4 +176,24 @@ public class Consumed<K, V> {
         this.resetPolicy = resetPolicy;
         return this;
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final Consumed<?, ?> consumed = (Consumed<?, ?>) o;
+        return Objects.equals(keySerde, consumed.keySerde) &&
+               Objects.equals(valueSerde, consumed.valueSerde) &&
+               Objects.equals(timestampExtractor, consumed.timestampExtractor) 
&&
+               resetPolicy == consumed.resetPolicy;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(keySerde, valueSerde, timestampExtractor, 
resetPolicy);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index b2513ea..8d2742a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -22,6 +22,8 @@ import 
org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
+import java.util.Objects;
+
 /**
  * This class is used to provide the optional parameters when producing to new 
topics
  * using {@link KStream#through(String, Produced)} or {@link 
KStream#to(String, Produced)}.
@@ -154,4 +156,23 @@ public class Produced<K, V> {
         this.keySerde = keySerde;
         return this;
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final Produced<?, ?> produced = (Produced<?, ?>) o;
+        return Objects.equals(keySerde, produced.keySerde) &&
+               Objects.equals(valueSerde, produced.valueSerde) &&
+               Objects.equals(partitioner, produced.partitioner);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(keySerde, valueSerde, partitioner);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to