Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207708527 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -17,73 +17,140 @@ */ package org.apache.storm.kafka.bolt; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.MkTupleParam; import org.apache.storm.tuple.Tuple; import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - - @SuppressWarnings({ "unchecked", "serial" }) - @Test - public void testSimple() { - final KafkaProducer<String, String> producer = mock(KafkaProducer.class); - when(producer.send(any(), any())).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - Callback c = (Callback)invocation.getArguments()[1]; - c.onCompletion(null, null); - return null; - } - }); - KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() { + + private <K, V> KafkaBolt<K, V> makeBolt(Producer<K, V> producer) { + KafkaBolt<K, V> bolt = new KafkaBolt<K, V>() { @Override - protected KafkaProducer<String, String> mkProducer(Properties props) { + protected Producer<K, V> mkProducer(Properties props) { return producer; } }; bolt.withTopicSelector("MY_TOPIC"); - + + return bolt; + } + + private Tuple createTestTuple(String... values) { + MkTupleParam param = new MkTupleParam(); + param.setFields("key", "message"); + return Testing.testTuple(Arrays.asList(values), param); + } + + @Test + public void testSimple() { + MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null); + KafkaBolt<String, String> bolt = makeBolt(producer); + OutputCollector collector = mock(OutputCollector.class); TopologyContext context = mock(TopologyContext.class); Map<String, Object> conf = new HashMap<>(); bolt.prepare(conf, context, collector); - MkTupleParam param = new MkTupleParam(); - param.setFields("key", "message"); - Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + + String key = "KEY"; + String value = "VALUE"; + Tuple testTuple = createTestTuple(key, value); bolt.execute(testTuple); - verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>() { - @Override - public boolean matches(ProducerRecord<String, String> arg) { - LOG.info("GOT {} ->", arg); - LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); - return "MY_TOPIC".equals(arg.topic()) && - "KEY".equals(arg.key()) && - "VALUE".equals(arg.value()); - } - }), any(Callback.class)); + + assertThat(producer.history().size(), is(1)); + ProducerRecord<String, String> arg = producer.history().get(0); + + LOG.info("GOT {} ->", arg); + LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); + assertThat(arg.topic(), is("MY_TOPIC")); + assertThat(arg.key(), is(key)); + assertThat(arg.value(), is(value)); + + // Complete the send + producer.completeNext(); verify(collector).ack(testTuple); } + @Test + public void testSimpleWithError() { + MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null); + KafkaBolt<String, String> bolt = makeBolt(producer); + + OutputCollector collector = mock(OutputCollector.class); + TopologyContext context = mock(TopologyContext.class); + Map<String, Object> conf = new HashMap<>(); + bolt.prepare(conf, context, collector); + + String key = "KEY"; + String value = "VALUE"; + Tuple testTuple = createTestTuple(key, value); + bolt.execute(testTuple); + + assertThat(producer.history().size(), is(1)); + ProducerRecord<String, String> arg = producer.history().get(0); + + LOG.info("GOT {} ->", arg); + LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); + assertThat(arg.topic(), is("MY_TOPIC")); + assertThat(arg.key(), is(key)); + assertThat(arg.value(), is(value)); + + // Force a send error + KafkaException ex = new KafkaException(); + producer.errorNext(ex); + verify(collector).reportError(ex); + verify(collector).fail(testTuple); + } + + @Test + public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() { + MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null); + KafkaBolt<String, String> bolt = makeBolt(producer); + + PreparableCallback customCallback = mock(PreparableCallback.class); + bolt.withProducerCallback(customCallback); + + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(new HashMap<>(), mock(TopologyContext.class), collector); + + String key = "KEY"; + String value = "VALUE"; + Tuple testTuple = createTestTuple(key, value); + bolt.execute(testTuple); + + assertThat(producer.history().size(), is(1)); + ProducerRecord<String, String> arg = producer.history().get(0); + + LOG.info("GOT {} ->", arg); + LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value()); + assertThat(arg.topic(), is("MY_TOPIC")); + assertThat(arg.key(), is(key)); + assertThat(arg.value(), is(value)); + + // Complete the send + producer.completeNext(); + verify(collector).ack(testTuple); + verify(customCallback).onCompletion(any(), any()); --- End diff -- Do you mean in regards to the `any()`? I'll see if I can make it a little better.
---