[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r208577649 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.bolt; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.storm.task.TopologyContext; + +/** + * Serializable callback for use with the KafkaProducer on KafkaBolt. + */ +public interface PreparableCallback extends Callback, Serializable { +void prepare(Map topoConf, TopologyContext context); --- End diff -- I'm a little unsure what you're asking. It's an extension of the Kafka Callback interface, which adds the prepare method that fits Storm. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207771586 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.bolt; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.storm.task.TopologyContext; + +/** + * Serializable callback for use with the KafkaProducer on KafkaBolt. + */ +public interface PreparableCallback extends Callback, Serializable { +void prepare(Map topoConf, TopologyContext context); --- End diff -- Why we name is as PreparableCallback, if it is used just for KafkaBolt, are there any reusable Class in the core ? ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2790 ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207761956 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPIC")); +assertThat(arg.key(), is(key)); +assertThat(arg.value(), is(value)); + +// Complete the send +producer.completeNext(); verify(collector).ack(testTuple); } +
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207711135 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPIC")); +assertThat(arg.key(), is(key)); +assertThat(arg.value(), is(value)); + +// Complete the send +producer.completeNext(); verify(collector).ack(testTuple); } +
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207709023 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPIC")); +assertThat(arg.key(), is(key)); +assertThat(arg.value(), is(value)); + +// Complete the send +producer.completeNext(); verify(collector).ack(testTuple); } +
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207708527 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPIC")); +assertThat(arg.key(), is(key)); +assertThat(arg.value(), is(value)); + +// Complete the send +producer.completeNext(); verify(collector).ack(testTuple); } +
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207706595 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - -@SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { -@Override -public Object answer(InvocationOnMock invocation) throws Throwable { -Callback c = (Callback)invocation.getArguments()[1]; -c.onCompletion(null, null); -return null; -} -}); -KafkaBolt bolt = new KafkaBolt() { + +private KafkaBolt makeBolt(Producer producer) { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { +protected Producer mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +private Tuple createTestTuple(String... values) { +MkTupleParam param = new MkTupleParam(); +param.setFields("key", "message"); +return Testing.testTuple(Arrays.asList(values), param); +} + +@Test +public void testSimple() { +MockProducer producer = new MockProducer<>(Cluster.empty(), false, null, null, null); +KafkaBolt bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map conf = new HashMap<>(); bolt.prepare(conf, context, collector); -MkTupleParam param = new MkTupleParam(); -param.setFields("key", "message"); -Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + +String key = "KEY"; +String value = "VALUE"; +Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); -verify(producer).send(argThat(new ArgumentMatcher>() { -@Override -public boolean matches(ProducerRecord arg) { -LOG.info("GOT {} ->", arg); -LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); -return "MY_TOPIC".equals(arg.topic()) && -"KEY".equals(arg.key()) && -"VALUE".equals(arg.value()); -} -}), any(Callback.class)); + +assertThat(producer.history().size(), is(1)); +ProducerRecord arg = producer.history().get(0); + +LOG.info("GOT {} ->", arg); +LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); +assertThat(arg.topic(), is("MY_TOPIC")); +assertThat(arg.key(), is(key)); +assertThat(arg.value(), is(value)); + +// Complete the send +producer.completeNext(); verify(collector).ack(testTuple); } +
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207671937 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- That looks good to me. I'm also not sure about the collector, but I'd maybe lean toward not including it. The callback won't have access to incoming tuples, so the only use would be if someone wanted the callback to emit a new unanchored tuple. I wouldn't worry about people misusing the prepare method. They can just as easily override the bolt's prepare method. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207649108 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- What about something like this? Not sure if `OutputCollector` is necessary or not. ```java public interface PreparedCallback extends Callback, Serializable { void prepare(Map topoConf, TopologyContext context, OutputCollector outputCollector); } ``` So if the `PreparedCallback` is not null, then KafkaBolt would call `.prepare()` during `KafkaBolt.prepare()`. The one concern I have is that it might be open for abuse by tempting people to use it to inject logic into the bolt preparation that's unrelated to Kafka publishing. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207560866 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- Fair point. I'll change it to `private`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207537361 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- Ah, good point. The workaround I was using prior to this was all done after serialization so this wasn't something I had to worry about. Let me think about what that interface should look like and I'll come up with something. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207534991 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- I can change to `private`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207526335 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- That would be fine. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207524658 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; --- End diff -- Nice, didn't know about that one. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207524275 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- That's a possibility, though `KafkaBolt` has `KafkaProducer` as a private field so `MockProducer` can't be used. That could be fixed by changing that field to a `Producer` interface instead. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207511238 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- I think we need to define our own interface for this. Callback isn't serializable, so Storm won't be able to transfer it from the Nimbus submitter to the worker JVM. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207515186 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- I'm wondering if we would be better off using https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/MockProducer.html? Our own stubbing ends up doing some weird things, e.g. returning null from `send`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207512766 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- Also I'm not sure exactly what someone implementing this interface would need, but maybe we should add a prepare method to the interface as well, so people who need some configuration or the topology context can get access? What do you think? ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207514281 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; --- End diff -- I would prefer not to use internal classes. I think https://static.javadoc.io/org.mockito/mockito-core/2.20.0/org/mockito/Mockito.html#mocking_details can do the same thing. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207511912 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- I'm not sure we want to make this protected. The default makes use of some private fields (e.g. collector), so subclasses won't be able to implement this properly, and I'm also not really understanding why someone would need to override this, since the code that is already here is pretty important to the bolt working correctly. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207428218 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { +return (ignored, e) -> { +synchronized (collector) { +if (e != null) { +collector.reportError(e); +collector.fail(input); +} else { +collector.ack(input); +} + +// User defined Callback +if (providedCallback != null) { --- End diff -- The original behavior of the Callback is pretty important so we probably don't want a user-defined Callback to completely erase the original behavior. If the intent _**is**_ to totally redefine the Callback then one can simply subclass `KafkaBolt` and explicitly override `mkProducerCallback()`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
GitHub user dfdemar opened a pull request: https://github.com/apache/storm/pull/2790 STORM-3175 - Allow usage of custom Callback. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dfdemar/storm custom-kafkabolt-callback Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2790 commit 8246a89a23a7e23c448b86ea97b81a1f93d06680 Author: David DeMar Date: 2018-08-03T02:37:17Z STORM-3175 - Allow usage of custom Callback. ---