Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1098#discussion_r52749466
  
    --- Diff: storm-core/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -0,0 +1,231 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Comparator;
    +import java.util.Random;
    +import java.util.concurrent.PriorityBlockingQueue;
    +import java.util.concurrent.Semaphore;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * The timer defined in this file is very similar to java.util.Timer, 
except
    + * it integrates with Storm's time simulation capabilities. This lets us 
test
    + * code that does asynchronous work on the timer thread
    + */
    +
    +public class StormTimer {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormTimer.class);
    +
    +    public interface TimerFunc {
    +        public void run(Object o);
    +    }
    +
    +    public static class QueueEntry {
    +        public final Long endTimeMs;
    +        public final TimerFunc afn;
    +        public final String id;
    +
    +        public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
    +            this.endTimeMs = endTimeMs;
    +            this.afn = afn;
    +            this.id = id;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return this.id + " " + this.endTimeMs + " " + this.afn;
    +        }
    +    }
    +
    +    public static class StormTimerTask extends Thread {
    +
    +        private PriorityBlockingQueue<QueueEntry> queue = new 
PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
    +            @Override
    +            public int compare(Object o1, Object o2) {
    +                return ((QueueEntry)o1).endTimeMs.intValue() - 
((QueueEntry)o2).endTimeMs.intValue();
    +            }
    +        });
    +
    +        private AtomicBoolean active = new AtomicBoolean(false);
    +
    +        private TimerFunc onKill;
    +
    +        private Random random = new Random();
    +
    +        private Semaphore cancelNotifier = new Semaphore(0);
    +
    +        private Object lock = new Object();
    +
    +        @Override
    +        public void run() {
    +            while (this.active.get()) {
    +                QueueEntry queueEntry = null;
    +                try {
    +                    synchronized (this.lock) {
    +                        queueEntry = this.queue.peek();
    +                    }
    +                    if ((queueEntry != null) && (Time.currentTimeMillis() 
>= queueEntry.endTimeMs)) {
    +                        // It is imperative to not run the function
    +                        // inside the timer lock. Otherwise, it is
    +                        // possible to deadlock if the fn deals with
    +                        // other locks, like the submit lock.
    +                        synchronized (this.lock) {
    +                            this.queue.poll();
    +                        }
    +                        queueEntry.afn.run(null);
    +                    } else if (queueEntry != null) {
    +                        //  If any events are scheduled, sleep until
    +                        // event generation. If any recurring events
    +                        // are scheduled then we will always go
    +                        // through this branch, sleeping only the
    +                        // exact necessary amount of time. We give
    +                        // an upper bound, e.g. 1000 millis, to the
    +                        // sleeping time, to limit the response time
    +                        // for detecting any new event within 1 secs.
    +                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - 
Time.currentTimeMillis())));
    +                    } else {
    +                        // Otherwise poll to see if any new event
    +                        // was scheduled. This is, in essence, the
    +                        // response time for detecting any new event
    +                        // schedulings when there are no scheduled
    +                        // events.
    +                        Time.sleep(1000);
    +                    }
    +                } catch (Throwable t) {
    +                    if 
(!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
    +                        this.onKill.run(t);
    +                        this.setActive(false);
    +                        throw new RuntimeException(t);
    +                    }
    +                }
    +            }
    +            this.cancelNotifier.release();
    +        }
    +
    +        public void setOnKillFunc(TimerFunc onKill) {
    +            this.onKill = onKill;
    +        }
    +
    +        public void setActive(boolean flag) {
    +            this.active.set(flag);
    +        }
    +
    +        public boolean isActive() {
    +            return this.active.get();
    +        }
    +
    +        public void add(QueueEntry queueEntry) {
    +            this.queue.add(queueEntry);
    +        }
    +    }
    +
    +    public static StormTimerTask mkTimer(String name, TimerFunc onKill) {
    +        if (onKill == null) {
    +            throw new RuntimeException("onKill func is null!");
    +        }
    +        StormTimerTask task  = new StormTimerTask();
    +        if (name == null) {
    +            task.setName("timer");
    +        } else {
    +            task.setName(name);
    +        }
    +        task.setOnKillFunc(onKill);
    +        task.setActive(true);
    +
    +        task.setDaemon(true);
    +        task.setPriority(Thread.MAX_PRIORITY);
    +        task.start();
    +        return task;
    +    }
    +    public static void schedule(StormTimerTask task, int delaySecs, 
TimerFunc afn, boolean checkActive, int jitterMs) {
    --- End diff --
    
    will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to