I confirmed that reflection warning for that line is gone after applying
Abhishek's suggestion.

You can check yourself by

mvn clean install -DskipTest
cd storm-core
mvn clojure:repl
repl> (set! *warn-on-reflection* true)
repl> (load-file "src/clj/org/apache/storm/daemon/executor.clj")

Roshan, you may want to apply suggestion and try profiling again.

Abhishek,
shuffle (or local-or-shuffle) grouping refers mk-shuffle-grouper, and when
topology.disable.loadaware.messaging is not true (default),
LoadAwareShuffleGroping (is a LoadAwareCustomStreamGrouping) is used.
You can refer mk-grouper / mk-shuffle-grouper / mk-custom-grouper for
details.

Thanks,
Jungtaek Lim (HeartSaVioR)

2016년 2월 4일 (목) 오후 2:46, Abhishek Agarwal <abhishc...@gmail.com>님이 작성:

> (.chooseTasks ^LoadAwareCustomStreamGrouping grouping task-id values load))
>
> Does the above  work? You can also give the type hint for target tasks
> (^List target-tasks)
>
> It is not clear to me why this code path will be even invoked, since you
> are not using LoadAwareCustomStreamGrouping in your topology.
>
> On Thu, Feb 4, 2016 at 9:56 AM, Roshan Naik <ros...@hortonworks.com>
> wrote:
>
> > Looks like a simple fix. Unfortunately don¹t know enough Clojure to fix
> it.
> >
> > Narrowed down the performance issue to this Clojure code in executor.clj
> :
> >
> >
> > (defn mk-custom-grouper
> >   [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String
> > component-id ^String stream-id target-tasks]
> >   (.prepare grouping context (GlobalStreamId. component-id stream-id)
> > target-tasks)
> >   (if (instance? LoadAwareCustomStreamGrouping grouping)
> >      (fn. [task-id ^List values load]
> >         (.chooseTasks grouping task-id values load))    ; <‹‹ problematic
> > invocation
> >      (fn [task-id ^List values load]
> >         (.chooseTasks grouping task-id values))))
> >
> >
> >
> >
> > Œgrouping' is statically typed to the base type CustomStreamGrouping. In
> > this run, its actual type is the derived type
> > LoadAwareCustomStreamGrouping.
> > The base type does not have a chooseTasks() method with 3 args. Only the
> > derived type has that method. Consequently clojure falls back to
> > dynamically iterating over the methods in the Œgrouping' object to locate
> > the right method & then invoke it appropriately. This falls in the
> > critical path  SpoutOutputCollector.emit() where it takes about ~20% time
> > .. just to find the right method.
> >
> > I tried a few things, but was unable to force as cast to
> > LoadAwareCustomStreamGrouping there or enable more efficient dispatching.
> >
> > If anyone knows how to fix it, I can try it and rerun the numbers.
> >
> > Since it appears to be an easy fix, we can do this w/o waiting for
> > CLJ-1784 or replacement of clojure subsystem.
> >
> > -roshan
> >
> >
> >
> >
> >
> > On 2/3/16, 12:03 AM, "Abhishek Agarwal" <abhishc...@gmail.com> wrote:
> >
> > >Thanks for sharing. This is very helpful.
> > >Regarding the reflection cost, it seems there is already a ticket open
> in
> > >clojure.
> > >http://dev.clojure.org/jira/browse/CLJ-1784
> > >
> > >In the discussion thread, its been suggested to use warn_on_reflection
> > ><https://clojuredocs.org/clojure.core/*warn-on-reflection*> property
> and
> > >use type hints. I am new to clojure so I can't say exactly how it will
> > >work
> > >out.
> > >
> > >Second one could be an indicator of the another problem. The function
> you
> > >have cited, is called in consumer path. It means messages are not
> flowing
> > >fast enough compared to consumers. This behavior is coupled with load
> > >pattern and topology parameters such as queue size. At what rate, are
> you
> > >generating the load and what is the size of disruptor queue? Also If
> your
> > >spout is slower compared to the bolts, this behavior is very much
> > >expected.
> > >Isn't it?
> > >
> > >On Wed, Feb 3, 2016 at 11:54 AM, Roshan Naik <ros...@hortonworks.com>
> > >wrote:
> > >
> > >> Looks like the attachments were stripped off.  So resending with links
> > >>to
> > >> profiler screenshots.
> > >>
> > >>  Call tree:
> > >>
> > >>
> >
> https://github.com/roshannaik/storm-benchmark-sol/blob/master/profiler/st
> > >>orm%20core%20-%20sol%20-%200%20acker/storm%20core%20-%20call%20tree.png
> > >>  Method stats:
> > >>
> > >>
> >
> https://github.com/roshannaik/storm-benchmark-sol/blob/master/profiler/st
> > >>orm%20core%20-%20sol%20-%200%20acker/storm%20core%20-%20method%
> > 20stats.pn
> > >>g
> > >>
> > >> -roshan
> > >>
> > >>
> > >> From: Roshan Naik
> > >><ros...@hortonworks.com<mailto:ros...@hortonworks.com>>
> > >> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" <
> > >> dev@storm.apache.org<mailto:dev@storm.apache.org>>
> > >> Date: Monday, February 1, 2016 at 6:38 PM
> > >> To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" <
> > >> dev@storm.apache.org<mailto:dev@storm.apache.org>>
> > >> Subject: Performance Profiling - Storm core
> > >>
> > >> Attached is a screenshots of the performance profile for Storm core
> > >>using
> > >> a Speed of Light topology.
> > >>
> > >> Topology info:
> > >> - 1 bolt instance, 1 spout instance, 1 worker.
> > >> - ACKer count = 0
> > >> - Spout generates precomputes a random list of tuples, then keeps
> > >>emitting
> > >> them endlessly
> > >> - Bolt just remits the same tuple and lacks
> > >> - localOrShuffleGrouping
> > >> - Topology Code :
> > >>
> > >>
> >
> https://github.com/roshannaik/storm-benchmark-sol/blob/master/src/main/ja
> > >>va/storm/benchmark/benchmarks/SOL.java
> > >>
> > >>
> > >> Observations:
> > >>
> > >>   *   Call tree info shows that a big part of the nextTuple()
> invocation
> > >> is consumed in the Collector.emit() call. A major part of that goes in
> > >> Reflection by the clojure code
> > >>   *   Method Stats view shows that a lot of time is spent blocking on
> > >>the
> > >> disruptor queue
> > >>
> > >>
> > >>
> > >
> > >
> > >--
> > >Regards,
> > >Abhishek Agarwal
> >
> >
>
>
> --
> Regards,
> Abhishek Agarwal
>

Reply via email to