Hi Vladimir -

> Why we didn't see any emitted data from builderBolt with small
"topology.max.spout.pending"?

This is because the topology.max.spout.pending setting prevented the spout
from consuming additional messages before there were enough messages for
Storm's event time queue to flush a set of messages to the Profiler.


> The question is how parameter "topology.max.spout.pending" depend on
amount of events?

I would recommend not setting this value for the Profiler.


> Why we had errors when "topology.max.spout.pending" was 10000?  
> java.lang.OutOfMemoryError:
GC overhead limit exceeded ...

Once you allowed the spout to consume enough messages to do some real work,
there was not enough memory for the heap.  What is the maximum heap size
that you are using for your Storm workers?  You will likely want to
increase this.


> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing.

You should start by increasing the number of executors to allow the
topology to process messages faster.


Hope this helps





On Tue, Jan 28, 2020 at 7:07 AM Vladimir Mikhailov <
[email protected]> wrote:

> We are trying to tune performance for profiler topology now. In config
> file for profiler there are no many parameters to do this. Therefore we've
> tried to change "topology.max.spot.pending". And we can't undestand how
> profiler performance depend on this parameter.
>
> We have about 6000-7000 events per minute and we changed
> "profiler.period.duration" to 1 minute and put the next profiler
> configuratoin:
>
>  {
>   "profiles" : [ {
>     "profile" : "netflowecs-ipcount",
>     "foreach" : "MAP_GET('source_ipv4_address', netflow)",
>     "result" : {
>       "profile" : "count",
>       "triage" : { }
>     },
>     "onlyif" : "source.type == 'netflow-ecs' &&
> IN_SUBNET(MAP_GET('source_ipv4_address', netflow),'192.168.0.0/16','
> 10.0.0.0/8','172.16.0.0/12')",
>     "init" : {
>       "count" : "0"
>     },
>     "update" : {
>       "count" : "count + 1"
>     },
>     "groupBy" : [ ]
>   }, {
>     "profile" : "auditbeat-ipcount",
>     "foreach" : "MAP_GET('ip', source)",
>     "result" : {
>       "profile" : "count",
>       "triage" : { }
>     },
>     "onlyif" : "source.type == 'auditbeat'",
>     "init" : {
>       "count" : "0"
>     },
>     "update" : {
>       "count" : "count + 1"
>     },
>     "groupBy" : [ ]
>   } ],
>   "timestampField" : "timestamp"
> }
>
>
> We only changed profiler.workers to 2. And we had this error on
> kafkaSpout, builderbolt and splitterBolt:
>
> "java.lang.OutOfMemoryError: GC overhead limit exceeded at
> clojure.core$str.doInvoke(core.clj:528) at
> clojure.lang.RestFn.invoke(RestFn.java:516) at
> org.apache.storm.daemon.task$mk_tasks_fn$fn__9830.invoke(task.clj:153) at
> org.apache.storm.daemon.task$send_unanchored.invoke(task.clj:119) at
> org.apache.storm.daemon.executor$fn__10149$fn__10164$send_spout_msg__10170.invoke(executor.clj:600)
> at
> org.apache.storm.daemon.executor$fn__10149$fn$reify__10181.emit(executor.clj:624)
> at
> org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:50)
> at
> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:488)
> at
> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:443)
> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:311)
> at
> org.apache.storm.daemon.executor$fn__10149$fn__10164$fn__10197.invoke(executor.clj:660)
> at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)".
>
>
> Then we set "topology.max.spout.pending" to 1000. As the result: nothing
> emitted from builderBolt to hbaseBolt:
>
> kafkaSpout   emitted-1260, trasferred-1260, complete latency - 1403.083,
> acked - 480
> splitterBolt emitted-1020, trasferred-1020, capacity - 0.018, executed -
> 1300, process latency - 1.348, acked - 1320
> builderBolt  emitted-0,    trasferred-0,    capacity - 0.001, executed -
> 1000, process latency - 0.000, acked - 0
> hbaseBolt    emitted-0,    trasferred-0,    capacity - 0.000, executed -
> 0,    process latency - 0,     acked - 0
> kafkaBolt    emitted-0,    trasferred-0,    capacity - 0.000, executed -
> 0,    process latency - 0,     acked - 0
>
>
> And nothing has been changed for a long period of time.
>
> After this we set "topology.max.spout.pending" to 10000. And got this:
>
> kafkaSpout   emitted-35940, trasferred-35940, complete latency -
> 30939.963, acked - 27200
> splitterBolt emitted-29380, trasferred-29380, capacity - 0.320, executed -
> 37180, process latency - 0.954,     acked - 37180
> builderBolt  emitted-400,   trasferred-400,   capacity - 0.001, executed -
> 29460, process latency - 30324.014, acked - 21840
> hbaseBolt    emitted-0,     trasferred-0,     capacity - 2.707, executed -
> 340,   process latency - 1475.471,  acked - 340
> kafkaBolt    emitted-0,     trasferred-0,     capacity - 0.000, executed -
> 0,     process latency - 0,         acked - 0
>
>
> Topology processed some events, but in one moment process stoped and we
> had this errors on kafkaSpout:
>
> "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records. at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377)
> at
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:534)
> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:293)
> at
> org.apache.storm.daemon.executor$fn__10149$fn__10164$fn__10197.invoke(executor.clj:660)
> at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)"
>
>
> and errors on splitterBolt and builderBolt:
>
> "java.lang.OutOfMemoryError: GC overhead limit exceeded at
> java.util.Arrays.copyOfRange(Arrays.java:3664) at
> java.lang.String.<init>(String.java:207) at
> java.lang.StringBuilder.toString(StringBuilder.java:407) at
> java.util.AbstractCollection.toString(AbstractCollection.java:464) at
> org.apache.storm.tuple.TupleImpl.toString(TupleImpl.java:227) at
> clojure.core$str.invoke(core.clj:526) at
> clojure.core$str$fn__4188.invoke(core.clj:530) at
> clojure.core$str.doInvoke(core.clj:528) at
> clojure.lang.RestFn.invoke(RestFn.java:460) at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10138.invoke(executor.clj:471)
> at
> org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41)
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> at
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
> at
> org.apache.storm.daemon.executor$fn__10219$fn__10232$fn__10287.invoke(executor.clj:868)
> at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)".
>
>
> The question is how parameter "topology.max.spout.pending" depend on
> amount of events?
>
> Why we didn't see any emitted data from builderBolt with small
> "topology.max.spout.pending"?
>
> Why we had errors when "topology.max.spout.pending" was 10000?
>

Reply via email to