Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/855#discussion_r43976655
--- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Count;
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Tracks a window of events and fires {@link WindowLifecycleListener}
callbacks
+ * on expiry of events or activation of the window due to {@link
TriggerPolicy}.
+ *
+ * @param <T> the type of event in the window.
+ */
+public class WindowManager<T> implements TriggerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(WindowManager.class);
+
+ /**
+ * Expire old events every EXPIRE_EVENTS_THRESHOLD to
+ * keep the window size in check.
+ */
+ public static final int EXPIRE_EVENTS_THRESHOLD = 100;
+
+ private WindowLifecycleListener<T> windowLifecycleListener;
+ private ConcurrentLinkedQueue<Event<T>> window;
+ private EvictionPolicy<T> evictionPolicy;
+ private TriggerPolicy<T> triggerPolicy;
+ private List<T> expiredEvents;
+ private Set<Event<T>> prevWindowEvents;
+ private AtomicInteger eventsSinceLastExpiry;
+ private ReentrantLock lock;
+
+ public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
+ windowLifecycleListener = lifecycleListener;
+ window = new ConcurrentLinkedQueue<>();
+ expiredEvents = new ArrayList<>();
+ prevWindowEvents = new HashSet<>();
+ eventsSinceLastExpiry = new AtomicInteger();
+ lock = new ReentrantLock(true);
+ }
+
+ public void setWindowLength(Count count) {
+ this.evictionPolicy = new CountEvictionPolicy<>(count.value);
+ }
+
+ public void setWindowLength(Duration duration) {
+ this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
+ }
+
+ public void setSlidingInterval(Count count) {
+ this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
+ }
+
+ public void setSlidingInterval(Duration duration) {
+ this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
+ }
+
+ /**
+ * Add an event into the window, with {@link
System#currentTimeMillis()} as
+ * the tracking ts.
+ *
+ * @param event the event to add
+ */
+ public void add(T event) {
+ add(event, System.currentTimeMillis());
+ }
+
+ /**
+ * Add an event into the window, with the given ts as the tracking ts.
+ *
+ * @param event the event to track
+ * @param ts the timestamp
+ */
+ public void add(T event, long ts) {
+ Event<T> windowEvent = new EventImpl<T>(event, ts);
+ window.add(windowEvent);
+ track(windowEvent);
+ compactWindow();
+ }
+
+ /**
+ * The callback invoked by the trigger policy.
+ */
+ @Override
+ public void onTrigger() {
+ List<Event<T>> windowEvents = new ArrayList<>();
+ List<T> expired = null;
+ try {
+ lock.lock();
+ /*
+ * scan the entire window to handle out of order events in
+ * the case of time based windows.
+ */
+ expireEvents(true, windowEvents);
+ expired = new ArrayList<>(expiredEvents);
+ expiredEvents.clear();
+ } finally {
+ lock.unlock();
+ }
+ List<T> events = new ArrayList<>();
+ List<T> newEvents = new ArrayList<>();
+ for (Event<T> event : windowEvents) {
+ events.add(event.get());
+ if (!prevWindowEvents.contains(event)) {
+ newEvents.add(event.get());
+ }
+ }
+ prevWindowEvents.clear();
+ prevWindowEvents.addAll(windowEvents);
+ LOG.debug("invoking windowLifecycleListener onActivation, [{}]
events in window.", windowEvents.size());
+ windowLifecycleListener.onActivation(events, newEvents, expired);
+ triggerPolicy.reset();
+ }
+
+ public void shutdown() {
+ LOG.debug("Shutting down WindowManager");
+ if (triggerPolicy != null) {
+ triggerPolicy.shutdown();
+ }
+ }
+
+ /**
+ * expires events that fall out of the window every
+ * EXPIRE_EVENTS_THRESHOLD so that the window does not grow
+ * too big.
+ */
+ private void compactWindow() {
+ if (eventsSinceLastExpiry.incrementAndGet() >=
EXPIRE_EVENTS_THRESHOLD) {
+ expireEvents(false, null);
+ }
+ }
+
+ /**
+ * feed the event to the eviction and trigger policies
+ * for bookkeeping and optionally firing the trigger.
+ */
+ private void track(Event<T> windowEvent) {
+ evictionPolicy.track(windowEvent);
+ triggerPolicy.track(windowEvent);
+ }
+
+ /**
+ * Expire events from the window, using the expiration policy to check
+ * if the event should be evicted or not.
+ *
+ * @param fullScan if set, will scan the entire window. if not set,
will stop
+ * as soon as an event not satisfying the expiration
policy is found.
+ * @param remaining the list of remaining events in the window after
expiry.
+ */
+ private void expireEvents(boolean fullScan, List<Event<T>> remaining) {
--- End diff --
We dont need the remaining list always. The `expireEvents` is also called
from `compactWindow` where we dont need the result and pass null. Another way
would be to pass a boolean flag to control this and return the result. The
`expiredEvents` is kept as a member variable since its updated during both
compaction and trigger.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---