[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r208577649
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.bolt;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Serializable callback for use with the KafkaProducer on KafkaBolt.
+ */
+public interface PreparableCallback extends Callback, Serializable {
+void prepare(Map topoConf, TopologyContext context);
--- End diff --

I'm a little unsure what you're asking. It's an extension of the Kafka 
Callback interface, which adds the prepare method that fits Storm. 


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-07 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207771586
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.bolt;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Serializable callback for use with the KafkaProducer on KafkaBolt.
+ */
+public interface PreparableCallback extends Callback, Serializable {
+void prepare(Map topoConf, TopologyContext context);
--- End diff --

Why we name is as PreparableCallback, if it is used just for KafkaBolt, are 
there any reusable Class in the core ?


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2790


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-05 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207761956
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPIC"));
+assertThat(arg.key(), is(key));
+assertThat(arg.value(), is(value));
+
+// Complete the send
+producer.completeNext();
 verify(collector).ack(testTuple);
 }
 
+

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-04 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207711135
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPIC"));
+assertThat(arg.key(), is(key));
+assertThat(arg.value(), is(value));
+
+// Complete the send
+producer.completeNext();
 verify(collector).ack(testTuple);
 }
 
+

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207709023
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPIC"));
+assertThat(arg.key(), is(key));
+assertThat(arg.value(), is(value));
+
+// Complete the send
+producer.completeNext();
 verify(collector).ack(testTuple);
 }
 
+

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-04 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207708527
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPIC"));
+assertThat(arg.key(), is(key));
+assertThat(arg.value(), is(value));
+
+// Complete the send
+producer.completeNext();
 verify(collector).ack(testTuple);
 }
 
+

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207706595
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -17,73 +17,140 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
-@SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
-@Override
-public Object answer(InvocationOnMock invocation) throws 
Throwable {
-Callback c = (Callback)invocation.getArguments()[1];
-c.onCompletion(null, null);
-return null;
-}
-});
-KafkaBolt bolt = new KafkaBolt() {
+
+private  KafkaBolt makeBolt(Producer producer) {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
+protected Producer mkProducer(Properties props) {
 return producer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+private Tuple createTestTuple(String... values) {
+MkTupleParam param = new MkTupleParam();
+param.setFields("key", "message");
+return Testing.testTuple(Arrays.asList(values), param);
+}
+
+@Test
+public void testSimple() {
+MockProducer producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+KafkaBolt bolt = makeBolt(producer);
+
 OutputCollector collector = mock(OutputCollector.class);
 TopologyContext context = mock(TopologyContext.class);
 Map conf = new HashMap<>();
 bolt.prepare(conf, context, collector);
-MkTupleParam param = new MkTupleParam();
-param.setFields("key", "message");
-Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
+
+String key = "KEY";
+String value = "VALUE";
+Tuple testTuple = createTestTuple(key, value);
 bolt.execute(testTuple);
-verify(producer).send(argThat(new 
ArgumentMatcher>() {
-@Override
-public boolean matches(ProducerRecord arg) {
-LOG.info("GOT {} ->", arg);
-LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
-return "MY_TOPIC".equals(arg.topic()) &&
-"KEY".equals(arg.key()) &&
-"VALUE".equals(arg.value());
-}
-}), any(Callback.class));
+
+assertThat(producer.history().size(), is(1));
+ProducerRecord arg = producer.history().get(0);
+
+LOG.info("GOT {} ->", arg);
+LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+assertThat(arg.topic(), is("MY_TOPIC"));
+assertThat(arg.key(), is(key));
+assertThat(arg.value(), is(value));
+
+// Complete the send
+producer.completeNext();
 verify(collector).ack(testTuple);
 }
 
+

[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207671937
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

That looks good to me. I'm also not sure about the collector, but I'd maybe 
lean toward not including it. The callback won't have access to incoming 
tuples, so the only use would be if someone wanted the callback to emit a new 
unanchored tuple.

I wouldn't worry about people misusing the prepare method. They can just as 
easily override the bolt's prepare method. 


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207649108
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

What about something like this? Not sure if `OutputCollector` is necessary 
or not.

```java
public interface PreparedCallback extends Callback, Serializable {
void prepare(Map topoConf, TopologyContext context, 
OutputCollector outputCollector);
}
```

So if the `PreparedCallback` is not null, then KafkaBolt would call 
`.prepare()` during `KafkaBolt.prepare()`.

The one concern I have is that it might be open for abuse by tempting 
people to use it to inject logic into the bolt preparation that's unrelated to 
Kafka publishing.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207560866
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

Fair point. I'll change it to `private`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207537361
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

Ah, good point. The workaround I was using prior to this was all done after 
serialization so this wasn't something I had to worry about. Let me think about 
what that interface should look like and I'll come up with something.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207534991
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

I can change to `private`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207526335
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

That would be fine.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207524658
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
--- End diff --

Nice, didn't know about that one.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207524275
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

That's a possibility, though `KafkaBolt` has `KafkaProducer` as a private 
field so `MockProducer` can't be used. That could be fixed by changing that 
field to a `Producer` interface instead.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207511238
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

I think we need to define our own interface for this. Callback isn't 
serializable, so Storm won't be able to transfer it from the Nimbus submitter 
to the worker JVM.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207515186
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

I'm wondering if we would be better off using 
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/MockProducer.html?
 Our own stubbing ends up doing some weird things, e.g. returning null from 
`send`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207512766
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

Also I'm not sure exactly what someone implementing this interface would 
need, but maybe we should add a prepare method to the interface as well, so 
people who need some configuration or the topology context can get access? What 
do you think?


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207514281
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
--- End diff --

I would prefer not to use internal classes. I think 
https://static.javadoc.io/org.mockito/mockito-core/2.20.0/org/mockito/Mockito.html#mocking_details
 can do the same thing.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207511912
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

I'm not sure we want to make this protected. The default makes use of some 
private fields (e.g. collector), so subclasses won't be able to implement this 
properly, and I'm also not really understanding why someone would need to 
override this, since the code that is already here is pretty important to the 
bolt working correctly.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-02 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207428218
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
+return (ignored, e) -> {
+synchronized (collector) {
+if (e != null) {
+collector.reportError(e);
+collector.fail(input);
+} else {
+collector.ack(input);
+}
+
+// User defined Callback
+if (providedCallback != null) {
--- End diff --

The original behavior of the Callback is pretty important so we probably 
don't want a user-defined Callback to completely erase the original behavior. 
If the intent _**is**_ to totally redefine the Callback then one can simply 
subclass `KafkaBolt` and explicitly override `mkProducerCallback()`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-02 Thread dfdemar
GitHub user dfdemar opened a pull request:

https://github.com/apache/storm/pull/2790

STORM-3175 - Allow usage of custom Callback.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dfdemar/storm custom-kafkabolt-callback

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2790


commit 8246a89a23a7e23c448b86ea97b81a1f93d06680
Author: David DeMar 
Date:   2018-08-03T02:37:17Z

STORM-3175 - Allow usage of custom Callback.




---