Github user avermeer commented on the issue:

    https://github.com/apache/storm/pull/2241
  
    Hello Roshan,
    
    Thank you very much for your huge work for improving Storm performances!
    
    Regarding "competitive perf evaluation", would you say that now the reasons
    why SuperCheif team moved away from Storm to their homebrewed streaming
    processing system in 2015 (see http://blog.librato.com/posts/superchief) no
    longer hold?
    
    Best regards,
    Alexandre Vermeerbergen
    
    2017-07-25 8:58 GMT+02:00 Satish Duggana <notificati...@github.com>:
    
    > *@satishd* commented on this pull request.
    >
    > Nice Work Roshan!
    > Had an initial look at the code and left few comments(mostly minor),
    > Overall LGTM.
    > ------------------------------
    >
    > In conf/defaults.yaml
    > <https://github.com/apache/storm/pull/2241#discussion_r129208440>:
    >
    > > @@ -146,7 +149,7 @@ supervisor.run.worker.as.user: false
    >  #how long supervisor will wait to ensure that a worker process is started
    >  supervisor.worker.start.timeout.secs: 120
    >  #how long between heartbeats until supervisor considers that worker dead 
and tries to restart it
    > -supervisor.worker.timeout.secs: 30
    > +supervisor.worker.timeout.secs: 30000
    >
    > Is this really a deliberate change?
    > ------------------------------
    >
    > In conf/defaults.yaml
    > <https://github.com/apache/storm/pull/2241#discussion_r129208570>:
    >
    > > @@ -253,11 +247,16 @@ topology.trident.batch.emit.interval.millis: 500
    >  topology.testing.always.try.serialize: false
    >  topology.classpath: null
    >  topology.environment: null
    > -topology.bolts.outgoing.overflow.buffer.enable: false
    > -topology.disruptor.wait.timeout.millis: 1000
    > -topology.disruptor.batch.size: 100
    > -topology.disruptor.batch.timeout.millis: 1
    > -topology.disable.loadaware.messaging: false
    > +topology.bolts.outgoing.overflow.buffer.enable: false # TODO: Roshan : 
Whats this ?
    > +topology.disruptor.wait.timeout.millis: 1000  # TODO: Roshan: not used, 
but we may/not want this behavior
    > +topology.transfer.buffer.size: 50000
    > +topology.transfer.batch.size: 10
    > +topology.executor.receive.buffer.size: 50000
    > +topology.producer.batch.size: 1000  # TODO: Roshan:  rename
    > +topology.flush.tuple.freq.millis: 5000
    >
    > nit: Better to add a comment describing about this property.
    > ------------------------------
    >
    > In conf/defaults.yaml
    > <https://github.com/apache/storm/pull/2241#discussion_r129208678>:
    >
    > > @@ -304,6 +303,7 @@ storm.cgroup.resources:
    >  storm.cgroup.hierarchy.name: "storm"
    >  storm.supervisor.cgroup.rootdir: "storm"
    >  storm.cgroup.cgexec.cmd: "/bin/cgexec"
    > +storm.cgroup.cgexec.cmd: "/bin/cgexec"
    >
    > may be an accidental copy, needs to be removed.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129209017>:
    >
    > > @@ -30,4 +31,5 @@
    >      void ack(Tuple input);
    >      void fail(Tuple input);
    >      void resetTimeout(Tuple input);
    > +    void flush();
    >
    > May want to add some javadoc about the same. It seems we are ready to
    > break the APIs with new set of changes in this redesign.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129212155>:
    >
    > >
    > -        for (Map.Entry<Integer, List<AddressedTuple>> entry : 
grouped.entrySet()) {
    > -            DisruptorQueue queue = 
shortExecutorReceiveQueueMap.get(entry.getKey());
    > -            if (null != queue) {
    > -                queue.publish(entry.getValue());
    > -            } else {
    > -                LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
    > +    private void transferLocalBatch(List<AddressedTuple> tupleBatch) {
    > +        try {
    > +            for (int i = 0; i < tupleBatch.size(); i++) {
    >
    > Does foreach have significant perf issue?
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/daemon/Acker.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129214432>:
    >
    > > @@ -66,6 +67,7 @@ public void prepare(Map<String, Object> topoConf, 
TopologyContext context, Outpu
    >
    >      @Override
    >      public void execute(Tuple input) {
    > +        long start = System.currentTimeMillis();
    >
    > nit: start is never used.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129214771>:
    >
    > > @@ -137,7 +137,7 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List<I
    >      public static class FieldsGrouper implements CustomStreamGrouping {
    >
    >          private Fields outFields;
    > -        private List<Integer> targetTasks;
    > +        private ArrayList<List<Integer> > targetTasks;
    >
    > nit: No need to change from List to ArrayList.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/daemon/Task.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129215798>:
    >
    > > @@ -246,4 +291,26 @@ private void addTaskHooks() {
    >          }
    >      }
    >
    > +    private static HashMap<String, 
ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(Map<String, 
Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) {
    >
    > nit: This can return Map<String, List<LoadAwareCustomStreamGrouping>
    > instead of implementations.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/daemon/Task.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129216374>:
    >
    > >      public List<Integer> getOutgoingTasks(String stream, List<Object> 
values) {
    >          if (debug) {
    >              LOG.info("Emitting Tuple: taskId={} componentId={} stream={} 
values={}", taskId, componentId, stream, values);
    >          }
    >
    > -        List<Integer> outTasks = new ArrayList<>();
    > -        if (!streamComponentToGrouper.containsKey(stream)) {
    > -            throw new IllegalArgumentException("Unknown stream ID: " + 
stream);
    > -        }
    > -        if (null != streamComponentToGrouper.get(stream)) {
    > -            // null value for __system
    > -            for (LoadAwareCustomStreamGrouping grouper : 
streamComponentToGrouper.get(stream).values()) {
    > +        ArrayList<Integer> outTasks = new ArrayList<>();
    > +
    > +        // TODO: PERF: expensive hashtable lookup in critical path
    >
    > Is this an expensive hit? This map may not contain many keys(no of streams
    > defined for this task).
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129217706>:
    >
    > >          this.outputCollectors = new ArrayList<>();
    > -        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
    > -            Task taskData = entry.getValue();
    > +        for (int i=0; i<idToTask.size(); ++i) {
    >
    > Why did we replace with this instead of using foreach with
    > idToTask.entrySet() as that does not need any map lookups?
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129217748>:
    >
    > >          this.outputCollectors = new ArrayList<>();
    > -        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
    > -            Task taskData = entry.getValue();
    > +        for (int i=0; i<idToTask.size(); ++i) {
    > +            Task taskData = idToTask.get(i);
    > +            if (taskData==null)
    >
    > Is this a valid condition? I guess taskData can never be null.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129218102>:
    >
    > >
    > -    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> 
threads, Map<Integer, Task> taskDatas) {
    > +    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> 
threads, ArrayList<Task> taskDatas) {
    >
    > nit: taskDatas as List
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129218772>:
    >
    > >          }
    >      }
    > +
    > +    private void flushRemotes() throws InterruptedException {
    > +        workerData.flushRemotes();
    > +    }
    > +
    > +    public boolean transferLocal(AddressedTuple tuple) throws 
InterruptedException {
    > +        workerData.checkSerialize(serializer, tuple);
    >
    > I guess this should be done only when it is determined to be sending to
    > local task. So, this should be pushed to just before queue.publish(tuple)
    > below.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129219489>:
    >
    > >      private ArrayList<List<Integer>> choices;
    > -    private AtomicInteger current;
    > +    private int current = 0 ;
    >
    > You changed this to non threadsafe. Is this instance not shared by
    > multiple components?
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129220412>:
    >
    > >          }
    >      }
    >
    > -    public TupleImpl(GeneralTopologyContext context, List<Object> 
values, int taskId, String streamId, MessageId id) {
    > +    public TupleImpl(GeneralTopologyContext context, List<Object> 
values, String srcComponent, int taskId, String streamId, MessageId id) {
    >
    > Why do we need to have this to be passed in constructor as that can be
    > derived like below from the existing arguments? This constructor change is
    > spread across all usages.
    > srcComponent = context.getComponentId(taskId)
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129222974>:
    >
    > >    private static final Logger LOG = 
LoggerFactory.getLogger(TransferDrainer.class);
    > -
    > -  public void add(HashMap<Integer, ArrayList<TaskMessage>> 
taskTupleSetMap) {
    > -    for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : 
taskTupleSetMap.entrySet()) {
    > -      addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
    > +
    > +  // Cache the msgs grouped by destination node
    > +  public void add(TaskMessage taskMsg) {
    > +    int destId = taskMsg.task();
    > +    ArrayList<TaskMessage> msgs = bundles.get(destId);
    >
    > we can have bundles.computeIfAbsent(destId, integer -> new ArrayList<>());
    > removing null check, creating list and put that into bundles map.
    > ------------------------------
    >
    > In storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
    > <https://github.com/apache/storm/pull/2241#discussion_r129223991>:
    >
    > >    private static final Logger LOG = 
LoggerFactory.getLogger(TransferDrainer.class);
    > -
    > -  public void add(HashMap<Integer, ArrayList<TaskMessage>> 
taskTupleSetMap) {
    > -    for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : 
taskTupleSetMap.entrySet()) {
    > -      addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
    > +
    > +  // Cache the msgs grouped by destination node
    > +  public void add(TaskMessage taskMsg) {
    >
    > It seems add and send are always invoked in the same thread from
    > (JCQueue.Consumer's accept and flush) and there is no contention. Is that
    > right?
    >
    > —
    > You are receiving this because you are subscribed to this thread.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/storm/pull/2241#pullrequestreview-51955774>,
    > or mute the thread
    > 
<https://github.com/notifications/unsubscribe-auth/ARCSMfGDBX-tlWGOS3zMhYmPFFFA8yMYks5sRZIvgaJpZM4OiB-Q>
    > .
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to