Hi Florian, thanks for your response.  My topology has been running with 
ack-after-emit for a few days now, and it looks like it has solved the issue, 
thanks!  Any reason why acking before emit would cause the tuple to not be 
actually acked? Does the emit change the ack-state of the tuple?  (We obviously 
were running with the assumption that after ack() was called, that the tuple 
would be acknowledged, regardless of what was emitted or not).

Thanks again.

From: Florian Hussonnois
Reply-To: "user@storm.apache.org<mailto:user@storm.apache.org>"
Date: Friday, August 21, 2015 at 12:10 AM
To: "user@storm.apache.org<mailto:user@storm.apache.org>"
Subject: Re: Workers maxing out heap, excessive GC, mk_acker_bolt->pending map 
is huge (0.9.5)


Hi,

You should ack input tuple after emitting new ones :

try {
// parse json string
...
// then emit
} catch (Throwable t) {
/*nothing to recover */
} finally {
collector.ack(tuple)
}

Hope this will fix your issue.

Le 21 août 2015 02:17, "Jason Chen" 
<jason.c...@hulu.com<mailto:jason.c...@hulu.com>> a écrit :
Hi all.

Here’s what I’m seeing.  I’ve got a fairly simple topology, consisting of 3 
bolts.  Kafka spout, simple processing bolt (JSON parse to POJO, a bit of 
processing, and back to JSON), and Kafka Bolt (output).  12 workers, Xmx1G.  It 
runs happily for a little over a day, then basically slows down/stops 
processing altogether.  Cluster is instrumented with storm-graphite 
(https://github.com/verisign/storm-graphite).  When the topology is freshly 
deployed, spout complete latency averages around 5ms, and JVM heap usage starts 
at around 100MB across all workers.  Very little PSMarkSweep GC activity.  The 
workers slowly creep up in heap usage across all workers, until they start 
brushing up against max heap.  At this point, the topology is pretty sad: spout 
complete latency averages 5sec, spout lag starts to increase, spout fail counts 
average 300/min, and PSMarkSweep GC averages 15+ seconds per run (!!!) 
averaging ~2 GC runs/worker/minute.  The JVMs are pretty much hosed at this 
point, and I stop seeing the topology doing much useful work.

I took a heapdump via JVisualVM of one of the maxed out workers.  The majority 
of the heap usage is dominated by the following structure:

==============================================================================
field              type                            retained heap
------------------------------------------------------------------------------
this               acker                           1,132,188,840
state              Container                       1,132,188,816
object             acker$mk_acker_bolt$reify__803  1,132,188,792
output_collector   MutableObject                   184
pending            MutableObject                   1,132,188,568
  o                RotatingMap                     1,132,188,544
...

Within the acker’s pending map, eventually we get to a huge HashMap (8,388,608 
items!) with <Long, PersistentArrayMap<Keyword, Long> pairs, eg:
key: 3133635607298342113, value: [ clojure.lang.Keyword #11, 
7092611081953912005 ]

This is probably what’s causing my workers to run out of heap.  But why is each 
worker keeping track of so many pending (tuples?)  My processing RichBolt acks 
immediately (since I fail-fast if I run into any parsing issues), so I can’t 
think of a reason why so many tuples would be pending (8 million+ ?).

Any ideas what might be causing this apparent leak?  I feel like we’re running 
a pretty stock topology.  Restarting the topology every day is the only 
reliable way to keep my topology running, not exactly the most scalable 
solution :p

Here are more details about my topology (including settings that I believe I’ve 
changed away from the default).
* storm-core, storm-kafka 0.9.5
* one acker per worker, so 12 ackers.

.yaml Config:
------------------------------------------------------------------------------------------------
topology.debug: false
topology.max.spout.pending: 2500 # The maximum number of tuples that can be 
pending on a spout task at any given time.
topology.spout.max.batch.size: 65 * 1024
topology.workers: 12

topology.worker.childopts: "-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/“
------------------------------------------------------------------------------------------------

Topology Construction:
------------------------------------------------------------------------------------------------
public static void buildProcessingTopology(final TopologyBuilder builder, final 
Config config, final String environment) {
    final Map<String, Object> kafkaConfig = checkNotNull((Map<String, Object>) 
config.get(CFG_KAFKA));
    final Map<String, Object> zookeeperConfig = checkNotNull((Map<String, 
Object>) config.get(CFG_ZOOKEEPER));

    final ZkHosts hosts = new ZkHosts((String) zookeeperConfig.get("address"));
    final String CLIENT_ID = "storm-spout-" + environment;
    final SpoutConfig spoutConfig = new SpoutConfig(hosts, 
checkNotNull((String) kafkaConfig.get("input_topic")),
                                                    checkNotNull((String) 
zookeeperConfig.get("root")), CLIENT_ID);
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());


    builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);

    builder.setBolt("processor", new ProcessorBolt("json"), 
4).shuffleGrouping("kafka-spout");

    builder.setBolt("kafka-output",
                    new KafkaBolt<String, String>()
                        .withTopicSelector(new 
DefaultTopicSelector(checkNotNull((String) kafkaConfig.get("output_topic"))))
                        .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("key", "json")),
                    1).shuffleGrouping("processor");
}
------------------------------------------------------------------------------------------------


Processing Bolt (simplified):
------------------------------------------------------------------------------------------------
public class ProcessorBolt extends BaseRichBolt {

    private String outputField;
    private OutputCollector outputCollector;

    protected ProcessorBolt(final String outputField) {
        this.outputField = outputField;
    }

    public void prepare(final Map conf, final TopologyContext context, final 
OutputCollector collector) {
        this.outputCollector = collector;
    }

    public void execute(final Tuple tuple) {
        final String wirejson = tuple.getString(0);

        // always ack, nothing to recover from if we can't parse/process
        this.outputCollector.ack(tuple);

        // parse json, process some stuff, get a list of maps back
        // ...

        for(Map<String, Object> m : maps) {
            String json = toJson(m);

            this.outputCollector.emit(tuple, new Values(json));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(this.outputField));
    }
}
------------------------------------------------------------------------------------------------

Let me know if you need any additional info.

Thanks in advance,
-Jason

Reply via email to