Hi Florian, thanks for your response. My topology has been running with ack-after-emit for a few days now, and it looks like it has solved the issue, thanks! Any reason why acking before emit would cause the tuple to not be actually acked? Does the emit change the ack-state of the tuple? (We obviously were running with the assumption that after ack() was called, that the tuple would be acknowledged, regardless of what was emitted or not).
Thanks again. From: Florian Hussonnois Reply-To: "user@storm.apache.org<mailto:user@storm.apache.org>" Date: Friday, August 21, 2015 at 12:10 AM To: "user@storm.apache.org<mailto:user@storm.apache.org>" Subject: Re: Workers maxing out heap, excessive GC, mk_acker_bolt->pending map is huge (0.9.5) Hi, You should ack input tuple after emitting new ones : try { // parse json string ... // then emit } catch (Throwable t) { /*nothing to recover */ } finally { collector.ack(tuple) } Hope this will fix your issue. Le 21 août 2015 02:17, "Jason Chen" <jason.c...@hulu.com<mailto:jason.c...@hulu.com>> a écrit : Hi all. Here’s what I’m seeing. I’ve got a fairly simple topology, consisting of 3 bolts. Kafka spout, simple processing bolt (JSON parse to POJO, a bit of processing, and back to JSON), and Kafka Bolt (output). 12 workers, Xmx1G. It runs happily for a little over a day, then basically slows down/stops processing altogether. Cluster is instrumented with storm-graphite (https://github.com/verisign/storm-graphite). When the topology is freshly deployed, spout complete latency averages around 5ms, and JVM heap usage starts at around 100MB across all workers. Very little PSMarkSweep GC activity. The workers slowly creep up in heap usage across all workers, until they start brushing up against max heap. At this point, the topology is pretty sad: spout complete latency averages 5sec, spout lag starts to increase, spout fail counts average 300/min, and PSMarkSweep GC averages 15+ seconds per run (!!!) averaging ~2 GC runs/worker/minute. The JVMs are pretty much hosed at this point, and I stop seeing the topology doing much useful work. I took a heapdump via JVisualVM of one of the maxed out workers. The majority of the heap usage is dominated by the following structure: ============================================================================== field type retained heap ------------------------------------------------------------------------------ this acker 1,132,188,840 state Container 1,132,188,816 object acker$mk_acker_bolt$reify__803 1,132,188,792 output_collector MutableObject 184 pending MutableObject 1,132,188,568 o RotatingMap 1,132,188,544 ... Within the acker’s pending map, eventually we get to a huge HashMap (8,388,608 items!) with <Long, PersistentArrayMap<Keyword, Long> pairs, eg: key: 3133635607298342113, value: [ clojure.lang.Keyword #11, 7092611081953912005 ] This is probably what’s causing my workers to run out of heap. But why is each worker keeping track of so many pending (tuples?) My processing RichBolt acks immediately (since I fail-fast if I run into any parsing issues), so I can’t think of a reason why so many tuples would be pending (8 million+ ?). Any ideas what might be causing this apparent leak? I feel like we’re running a pretty stock topology. Restarting the topology every day is the only reliable way to keep my topology running, not exactly the most scalable solution :p Here are more details about my topology (including settings that I believe I’ve changed away from the default). * storm-core, storm-kafka 0.9.5 * one acker per worker, so 12 ackers. .yaml Config: ------------------------------------------------------------------------------------------------ topology.debug: false topology.max.spout.pending: 2500 # The maximum number of tuples that can be pending on a spout task at any given time. topology.spout.max.batch.size: 65 * 1024 topology.workers: 12 topology.worker.childopts: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/“ ------------------------------------------------------------------------------------------------ Topology Construction: ------------------------------------------------------------------------------------------------ public static void buildProcessingTopology(final TopologyBuilder builder, final Config config, final String environment) { final Map<String, Object> kafkaConfig = checkNotNull((Map<String, Object>) config.get(CFG_KAFKA)); final Map<String, Object> zookeeperConfig = checkNotNull((Map<String, Object>) config.get(CFG_ZOOKEEPER)); final ZkHosts hosts = new ZkHosts((String) zookeeperConfig.get("address")); final String CLIENT_ID = "storm-spout-" + environment; final SpoutConfig spoutConfig = new SpoutConfig(hosts, checkNotNull((String) kafkaConfig.get("input_topic")), checkNotNull((String) zookeeperConfig.get("root")), CLIENT_ID); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1); builder.setBolt("processor", new ProcessorBolt("json"), 4).shuffleGrouping("kafka-spout"); builder.setBolt("kafka-output", new KafkaBolt<String, String>() .withTopicSelector(new DefaultTopicSelector(checkNotNull((String) kafkaConfig.get("output_topic")))) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "json")), 1).shuffleGrouping("processor"); } ------------------------------------------------------------------------------------------------ Processing Bolt (simplified): ------------------------------------------------------------------------------------------------ public class ProcessorBolt extends BaseRichBolt { private String outputField; private OutputCollector outputCollector; protected ProcessorBolt(final String outputField) { this.outputField = outputField; } public void prepare(final Map conf, final TopologyContext context, final OutputCollector collector) { this.outputCollector = collector; } public void execute(final Tuple tuple) { final String wirejson = tuple.getString(0); // always ack, nothing to recover from if we can't parse/process this.outputCollector.ack(tuple); // parse json, process some stuff, get a list of maps back // ... for(Map<String, Object> m : maps) { String json = toJson(m); this.outputCollector.emit(tuple, new Values(json)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(this.outputField)); } } ------------------------------------------------------------------------------------------------ Let me know if you need any additional info. Thanks in advance, -Jason