We are trying to tune performance for profiler topology now. In config file for 
profiler there are no many parameters to do this. Therefore we've tried to 
change "topology.max.spot.pending". And we can't undestand how profiler 
performance depend on this parameter. 
 
We have about 6000-7000 events per minute and we changed 
"profiler.period.duration" to 1 minute and put the next profiler configuratoin:
 
 {
  "profiles" : [ {
    "profile" : "netflowecs-ipcount",
    "foreach" : "MAP_GET('source_ipv4_address', netflow)",
    "result" : {
      "profile" : "count",
      "triage" : { }
    },
    "onlyif" : "source.type == 'netflow-ecs' && 
IN_SUBNET(MAP_GET('source_ipv4_address', 
netflow),'192.168.0.0/16','10.0.0.0/8','172.16.0.0/12')",
    "init" : {
      "count" : "0"
    },
    "update" : {
      "count" : "count + 1"
    },
    "groupBy" : [ ]
  }, {
    "profile" : "auditbeat-ipcount",
    "foreach" : "MAP_GET('ip', source)",
    "result" : {
      "profile" : "count",
      "triage" : { }
    },
    "onlyif" : "source.type == 'auditbeat'",
    "init" : {
      "count" : "0"
    },
    "update" : {
      "count" : "count + 1"
    },
    "groupBy" : [ ]
  } ],
  "timestampField" : "timestamp"
}
 

We only changed profiler.workers to 2. And we had this error on kafkaSpout, 
builderbolt and splitterBolt:
 
"java.lang.OutOfMemoryError: GC overhead limit exceeded at 
clojure.core$str.doInvoke(core.clj:528) at 
clojure.lang.RestFn.invoke(RestFn.java:516) at 
org.apache.storm.daemon.task$mk_tasks_fn$fn__9830.invoke(task.clj:153) at 
org.apache.storm.daemon.task$send_unanchored.invoke(task.clj:119) at 
org.apache.storm.daemon.executor$fn__10149$fn__10164$send_spout_msg__10170.invoke(executor.clj:600)
 at 
org.apache.storm.daemon.executor$fn__10149$fn$reify__10181.emit(executor.clj:624)
 at 
org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:50) 
at 
org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:488) 
at 
org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:443)
 at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:311) at 
org.apache.storm.daemon.executor$fn__10149$fn__10164$fn__10197.invoke(executor.clj:660)
 at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at 
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)".


Then we set "topology.max.spout.pending" to 1000. As the result: nothing 
emitted from builderBolt to hbaseBolt:
          
kafkaSpout   emitted-1260, trasferred-1260, complete latency - 1403.083, acked 
- 480
splitterBolt emitted-1020, trasferred-1020, capacity - 0.018, executed - 1300, 
process latency - 1.348, acked - 1320
builderBolt  emitted-0,    trasferred-0,    capacity - 0.001, executed - 1000, 
process latency - 0.000, acked - 0
hbaseBolt    emitted-0,    trasferred-0,    capacity - 0.000, executed - 0,    
process latency - 0,     acked - 0
kafkaBolt    emitted-0,    trasferred-0,    capacity - 0.000, executed - 0,    
process latency - 0,     acked - 0


And nothing has been changed for a long period of time.
 
After this we set "topology.max.spout.pending" to 10000. And got this:
 
kafkaSpout   emitted-35940, trasferred-35940, complete latency - 30939.963, 
acked - 27200
splitterBolt emitted-29380, trasferred-29380, capacity - 0.320, executed - 
37180, process latency - 0.954,     acked - 37180
builderBolt  emitted-400,   trasferred-400,   capacity - 0.001, executed - 
29460, process latency - 30324.014, acked - 21840
hbaseBolt    emitted-0,     trasferred-0,     capacity - 2.707, executed - 340, 
  process latency - 1475.471,  acked - 340
kafkaBolt    emitted-0,     trasferred-0,     capacity - 0.000, executed - 0,   
  process latency - 0,         acked - 0


Topology processed some events, but in one moment process stoped and we had 
this errors on kafkaSpout:
 
"org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing the session timeout or by reducing the maximum size 
of batches returned in poll() with max.poll.records. at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377)
 at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:534)
 at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:293) at 
org.apache.storm.daemon.executor$fn__10149$fn__10164$fn__10197.invoke(executor.clj:660)
 at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at 
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)"


and errors on splitterBolt and builderBolt: 
 
"java.lang.OutOfMemoryError: GC overhead limit exceeded at 
java.util.Arrays.copyOfRange(Arrays.java:3664) at 
java.lang.String.<init>(String.java:207) at 
java.lang.StringBuilder.toString(StringBuilder.java:407) at 
java.util.AbstractCollection.toString(AbstractCollection.java:464) at 
org.apache.storm.tuple.TupleImpl.toString(TupleImpl.java:227) at 
clojure.core$str.invoke(core.clj:526) at 
clojure.core$str$fn__4188.invoke(core.clj:530) at 
clojure.core$str.doInvoke(core.clj:528) at 
clojure.lang.RestFn.invoke(RestFn.java:460) at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__10138.invoke(executor.clj:471)
 at 
org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41)
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 at 
org.apache.storm.daemon.executor$fn__10219$fn__10232$fn__10287.invoke(executor.clj:868)
 at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) at 
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)".


The question is how parameter "topology.max.spout.pending" depend on amount of 
events? 

Why we didn't see any emitted data from builderBolt with small 
"topology.max.spout.pending"? 

Why we had errors when "topology.max.spout.pending" was 10000? 

Reply via email to