[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gergo Hong updated STORM-2915: ------------------------------ Summary: How could I to get the fail Number in Bolt When I use Kafka Spout (was: How could I to get the fail Numer in Bolt When I use Kafka Spout) > How could I to get the fail Number in Bolt When I use Kafka Spout > -------------------------------------------------------------------- > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client > Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 > Reporter: Gergo Hong > Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)