[GitHub] storm issue #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on the issue: https://github.com/apache/storm/pull/2790 @srdo Ready to merge! ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207761956 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPI
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207711135 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPI
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207708527 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPI
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207649108 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- What about something like this? Not sure if `OutputCollector` is necessary or not. ```java public interface PreparedCallback extends Callback, Serializable { void prepare(Map topoConf, TopologyContext context, OutputCollector outputCollector); } ``` So if the `PreparedCallback` is not null, then KafkaBolt would call `.prepare()` during `KafkaBolt.prepare()`. The one concern I have is that it might be open for abuse by tempting people to use it to inject logic into the bolt preparation that's unrelated to Kafka publishing. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207560866 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- Fair point. I'll change it to `private`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207537361 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- Ah, good point. The workaround I was using prior to this was all done after serialization so this wasn't something I had to worry about. Let me think about what that interface should look like and I'll come up with something. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207534991 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- I can change to `private`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207524658 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; --- End diff -- Nice, didn't know about that one. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207524275 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- That's a possibility, though `KafkaBolt` has `KafkaProducer` as a private field so `MockProducer` can't be used. That could be fixed by changing that field to a `Producer` interface instead. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207428218 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { +return (ignored, e) -> { +synchronized (collector) { +if (e != null) { +collector.reportError(e); +collector.fail(input); +} else { +collector.ack(input); +} + +// User defined Callback +if (providedCallback != null) { --- End diff -- The original behavior of the Callback is pretty important so we probably don't want a user-defined Callback to completely erase the original behavior. If the intent _**is**_ to totally redefine the Callback then one can simply subclass `KafkaBolt` and explicitly override `mkProducerCallback()`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
GitHub user dfdemar opened a pull request: https://github.com/apache/storm/pull/2790 STORM-3175 - Allow usage of custom Callback. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dfdemar/storm custom-kafkabolt-callback Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2790 commit 8246a89a23a7e23c448b86ea97b81a1f93d06680 Author: David DeMar Date: 2018-08-03T02:37:17Z STORM-3175 - Allow usage of custom Callback. ---