We are trying to tune performance for profiler topology now. In config file for
profiler there are no many parameters to do this. Therefore we've tried to
change "topology.max.spot.pending". And we can't undestand how profiler
performance depend on this parameter.
We have about 6000-7000 events per minute and we changed
"profiler.period.duration" to 1 minute and put the next profiler configuratoin:
{
"profiles" : [ {
"profile" : "netflowecs-ipcount",
"foreach" : "MAP_GET('source_ipv4_address', netflow)",
"result" : {
"profile" : "count",
"triage" : { }
},
"onlyif" : "source.type == 'netflow-ecs' &&
IN_SUBNET(MAP_GET('source_ipv4_address',
netflow),'192.168.0.0/16','10.0.0.0/8','172.16.0.0/12')",
"init" : {
"count" : "0"
},
"update" : {
"count" : "count + 1"
},
"groupBy" : [ ]
}, {
"profile" : "auditbeat-ipcount",
"foreach" : "MAP_GET('ip', source)",
"result" : {
"profile" : "count",
"triage" : { }
},
"onlyif" : "source.type == 'auditbeat'",
"init" : {
"count" : "0"
},
"update" : {
"count" : "count + 1"
},
"groupBy" : [ ]
} ],
"timestampField" : "timestamp"
}
We only changed profiler.workers to 2. And we had this error on kafkaSpout,
builderbolt and splitterBolt:
"java.lang.OutOfMemoryError: GC overhead limit exceeded at
clojure.core$str.doInvoke(core.clj:528) at
clojure.lang.RestFn.invoke(RestFn.java:516) at
org.apache.storm.daemon.task$mk_tasks_fn$fn__9830.invoke(task.clj:153) at
org.apache.storm.daemon.task$send_unanchored.invoke(task.clj:119) at
org.apache.storm.daemon.executor$fn__10149$fn__10164$send_spout_msg__10170.invoke(executor.clj:600)
at
org.apache.storm.daemon.executor$fn__10149$fn$reify__10181.emit(executor.clj:624)
at
org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:50)
at
org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:488)
at
org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:443)
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:311) at
org.apache.storm.daemon.executor$fn__10149$fn__10164$fn__10197.invoke(executor.clj:660)
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)".
Then we set "topology.max.spout.pending" to 1000. As the result: nothing
emitted from builderBolt to hbaseBolt:
kafkaSpout emitted-1260, trasferred-1260, complete latency - 1403.083, acked
- 480
splitterBolt emitted-1020, trasferred-1020, capacity - 0.018, executed - 1300,
process latency - 1.348, acked - 1320
builderBolt emitted-0, trasferred-0, capacity - 0.001, executed - 1000,
process latency - 0.000, acked - 0
hbaseBolt emitted-0, trasferred-0, capacity - 0.000, executed - 0,
process latency - 0, acked - 0
kafkaBolt emitted-0, trasferred-0, capacity - 0.000, executed - 0,
process latency - 0, acked - 0
And nothing has been changed for a long period of time.
After this we set "topology.max.spout.pending" to 10000. And got this:
kafkaSpout emitted-35940, trasferred-35940, complete latency - 30939.963,
acked - 27200
splitterBolt emitted-29380, trasferred-29380, capacity - 0.320, executed -
37180, process latency - 0.954, acked - 37180
builderBolt emitted-400, trasferred-400, capacity - 0.001, executed -
29460, process latency - 30324.014, acked - 21840
hbaseBolt emitted-0, trasferred-0, capacity - 2.707, executed - 340,
process latency - 1475.471, acked - 340
kafkaBolt emitted-0, trasferred-0, capacity - 0.000, executed - 0,
process latency - 0, acked - 0
Topology processed some events, but in one moment process stoped and we had
this errors on kafkaSpout:
"org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was
longer than the configured max.poll.interval.ms, which typically implies that
the poll loop is spending too much time message processing. You can address
this either by increasing the session timeout or by reducing the maximum size
of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377)
at
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:534)
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:293) at
org.apache.storm.daemon.executor$fn__10149$fn__10164$fn__10197.invoke(executor.clj:660)
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)"
and errors on splitterBolt and builderBolt:
"java.lang.OutOfMemoryError: GC overhead limit exceeded at
java.util.Arrays.copyOfRange(Arrays.java:3664) at
java.lang.String.<init>(String.java:207) at
java.lang.StringBuilder.toString(StringBuilder.java:407) at
java.util.AbstractCollection.toString(AbstractCollection.java:464) at
org.apache.storm.tuple.TupleImpl.toString(TupleImpl.java:227) at
clojure.core$str.invoke(core.clj:526) at
clojure.core$str$fn__4188.invoke(core.clj:530) at
clojure.core$str.doInvoke(core.clj:528) at
clojure.lang.RestFn.invoke(RestFn.java:460) at
org.apache.storm.daemon.executor$mk_task_receiver$fn__10138.invoke(executor.clj:471)
at
org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41)
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
at
org.apache.storm.daemon.executor$fn__10219$fn__10232$fn__10287.invoke(executor.clj:868)
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)".
The question is how parameter "topology.max.spout.pending" depend on amount of
events?
Why we didn't see any emitted data from builderBolt with small
"topology.max.spout.pending"?
Why we had errors when "topology.max.spout.pending" was 10000?