[GitHub] storm issue #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-06 Thread dfdemar
Github user dfdemar commented on the issue:

https://github.com/apache/storm/pull/2790
  
@srdo Ready to merge!


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-05 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207761956
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPI

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-04 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207711135
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPI

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-04 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207708527
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPI

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207649108
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

What about something like this? Not sure if `OutputCollector` is necessary 
or not.

```java
public interface PreparedCallback extends Callback, Serializable {
void prepare(Map topoConf, TopologyContext context, 
OutputCollector outputCollector);
}
```

So if the `PreparedCallback` is not null, then KafkaBolt would call 
`.prepare()` during `KafkaBolt.prepare()`.

The one concern I have is that it might be open for abuse by tempting 
people to use it to inject logic into the bolt preparation that's unrelated to 
Kafka publishing.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207560866
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

Fair point. I'll change it to `private`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207537361
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

Ah, good point. The workaround I was using prior to this was all done after 
serialization so this wasn't something I had to worry about. Let me think about 
what that interface should look like and I'll come up with something.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207534991
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

I can change to `private`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207524658
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
--- End diff --

Nice, didn't know about that one.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207524275
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

That's a possibility, though `KafkaBolt` has `KafkaProducer` as a private 
field so `MockProducer` can't be used. That could be fixed by changing that 
field to a `Producer` interface instead.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-02 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207428218
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
+return (ignored, e) -> {
+synchronized (collector) {
+if (e != null) {
+collector.reportError(e);
+collector.fail(input);
+} else {
+collector.ack(input);
+}
+
+// User defined Callback
+if (providedCallback != null) {
--- End diff --

The original behavior of the Callback is pretty important so we probably 
don't want a user-defined Callback to completely erase the original behavior. 
If the intent _**is**_ to totally redefine the Callback then one can simply 
subclass `KafkaBolt` and explicitly override `mkProducerCallback()`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-02 Thread dfdemar
GitHub user dfdemar opened a pull request:

https://github.com/apache/storm/pull/2790

STORM-3175 - Allow usage of custom Callback.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dfdemar/storm custom-kafkabolt-callback

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2790


commit 8246a89a23a7e23c448b86ea97b81a1f93d06680
Author: David DeMar 
Date:   2018-08-03T02:37:17Z

STORM-3175 - Allow usage of custom Callback.




---