Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/storm/pull/572#discussion_r33066884
--- Diff: external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---
@@ -102,12 +105,40 @@ public void execute(Tuple input) {
key = mapper.getKeyFromTuple(input);
message = mapper.getMessageFromTuple(input);
topic = topicSelector.getTopic(input);
- if(topic != null ) {
- producer.send(new KeyedMessage<K, V>(topic, key, message));
+ if (topic != null ) {
+ Callback callback = null;
+
+ if (!fireAndForget && async) {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata ignored,
Exception e) {
+ synchronized(collector) {
+ if (e != null) {
+ collector.reportError(e);
+ collector.fail(input);
+ } else {
+ collector.ack(input);
+ }
+ }
+ }
+ };
+ }
+ Future<RecordMetadata> result = producer.send(new
ProducerRecord<K, V>(topic, key, message), callback);
+ if (!async) {
+ try {
+ result.get();
+ collector.ack(input);
+ } catch (ExecutionException err) {
+ collector.reportError(err);
+ collector.fail(input);
+ }
+ } else if (fireAndForget) {
--- End diff --
where did the fireAndForget option come from? Did someone request this?
I think the user can essentially set this by setting the "acks" kafka
producer config. Although looking I'm not sure that is exposed here. It seems
like this Bolt would be more useful if we allowed the user to producer configs.
I guess we can split that out into another jira though.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---