[jira] [Commented] (STORM-2914) Remove enable.auto.commit support from storm-kafka-client
[ https://issues.apache.org/jira/browse/STORM-2914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342987#comment-16342987 ] Alexandre Vermeerbergen commented on STORM-2914: [~Srdo] We do set topology.max.spout.pending, and we already had a discussion in the past that until Storm 2.x, we had to use our by-pass to overcome the limitations of Storm 1.x back-pressure: See: [https://mail-archives.apache.org/mod_mbox/storm-user/201705.mbox/%3CCADeKz6qZG12mN-=gf+mpta1jwxdk8_wwz1npbcgslx7fga4...@mail.gmail.com%3E] (at subsequent posts). We observe no OOM, just the consumption from Spouts stops, regardless of whether it our own Kafka spout, the old Storm Kafka client or the newer Storm Kafka Client. Back to Storm 1.2.0 : any idea when I could test a fix which removes the cycling "WARN" message about metadata ? Thanks! > Remove enable.auto.commit support from storm-kafka-client > - > > Key: STORM-2914 > URL: https://issues.apache.org/jira/browse/STORM-2914 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The enable.auto.commit option causes the KafkaConsumer to periodically commit > the latest offsets it has returned from poll(). It is convenient for use > cases where messages are polled from Kafka and processed synchronously, in a > loop. > Due to https://issues.apache.org/jira/browse/STORM-2913 we'd really like to > store some metadata in Kafka when the spout commits. This is not possible > with enable.auto.commit. I took at look at what that setting actually does, > and it just causes the KafkaConsumer to call commitAsync during poll (and > during a few other operations, e.g. close and assign) with some interval. > Ideally I'd like to get rid of ProcessingGuarantee.NONE, since I think > ProcessingGuarantee.AT_MOST_ONCE covers the same use cases, and is likely > almost as fast. The primary difference between them is that AT_MOST_ONCE > commits synchronously. > If we really want to keep ProcessingGuarantee.NONE, I think we should make > our ProcessingGuarantee.NONE setting cause the spout to call commitAsync > after poll, and never use the enable.auto.commit option. This allows us to > include metadata in the commit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout
[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gergo Hong updated STORM-2915: -- Summary: How could I to get the fail Number in Bolt When I use Kafka Spout (was: How could I to get the fail Numer in Bolt When I use Kafka Spout) > How could I to get the fail Number in Bolt When I use Kafka Spout > > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client >Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 >Reporter: Gergo Hong >Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (STORM-2915) How could I to get the fail Numer in Bolt When I use Kafka Spout
Gergo Hong created STORM-2915: - Summary: How could I to get the fail Numer in Bolt When I use Kafka Spout Key: STORM-2915 URL: https://issues.apache.org/jira/browse/STORM-2915 Project: Apache Storm Issue Type: New Feature Components: storm-kafka-client Affects Versions: 1.0.5, 1.1.1, 1.0.4, 1.0.3, 1.1.0, 1.0.2 Reporter: Gergo Hong I want to get fail num in bolt , how could I to get it? if fail it retry, I see This if (!isScheduled || retryService.isReady(msgId)) { final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; if (!isAtLeastOnceProcessing()) { if (kafkaSpoutConfig.isTupleTrackingEnforced()) { collector.emit(stream, tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } else { collector.emit(stream, tuple); LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); } } else { emitted.add(msgId); offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule. retryService.remove(msgId); } collector.emit(stream, tuple, msgId); tupleListener.onEmit(tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } return true; } -- This message was sent by Atlassian JIRA (v7.6.3#76005)