Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930526 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java --- @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.collect.Lists; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerPolicy; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.apache.storm.windowing.WindowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing + * tuples and triggers related information. + * + */ +public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager { + private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class); + + protected final WindowManager<T> windowManager; + protected final Aggregator aggregator; + protected final BatchOutputCollector delegateCollector; + protected final String windowTaskId; + protected final WindowsStore windowStore; + + protected final Set<String> activeBatches = new HashSet<>(); + protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>(); + protected final AtomicInteger triggerId = new AtomicInteger(); + private final String windowTriggerCountId; + private final TriggerPolicy<T> triggerPolicy; + + public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, + Aggregator aggregator, BatchOutputCollector delegateCollector) { + this.windowTaskId = windowTaskId; + this.windowStore = windowStore; + this.aggregator = aggregator; + this.delegateCollector = delegateCollector; + + windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId; + + windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); + + WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig); + EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy(); + windowManager.setEvictionPolicy(evictionPolicy); + triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); + windowManager.setTriggerPolicy(triggerPolicy); + } + + @Override + public void prepare() { + preInitialize(); + + initialize(); + + postInitialize(); + } + + private void preInitialize() { + log.debug("Getting current trigger count for this component/task"); + // get trigger count value from store + Object result = windowStore.get(windowTriggerCountId); + Integer currentCount = 0; + if(result == null) { + log.info("No current trigger count in windows store."); + } else { + currentCount = (Integer) result + 1; + } + windowStore.put(windowTriggerCountId, currentCount); + triggerId.set(currentCount); + } + + private void postInitialize() { + // start trigger once the initialization is done. + triggerPolicy.start(); + } + + /** + * Load and initialize any resources into window manager before windowing for component/task is activated. + */ + protected abstract void initialize(); + + /** + * Listener to reeive any activation/expiry of windowing events and take further action on them. + */ + class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> { + + @Override + public void onExpiry(List<T> expiredEvents) { + log.debug("onExpiry is invoked"); + onTuplesExpired(expiredEvents); + } + + @Override + public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { + log.debug("onActivation is invoked with events size: {}", events.size()); + // trigger occurred, create an aggregation and keep them in store + int currentTriggerId = triggerId.incrementAndGet(); + execAggregatorAndStoreResult(currentTriggerId, events); + } + } + + /** + * Handle expired tuple events which can be removing from cache or store. + * + * @param expiredEvents + */ + protected abstract void onTuplesExpired(List<T> expiredEvents); + + private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) { + List<TridentTuple> resultTuples = getTridentTuples(tupleEvents); + + // run aggregator to compute the result + AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); + Object state = aggregator.init(currentTriggerId, collector); + for (TridentTuple resultTuple : resultTuples) { + aggregator.aggregate(state, resultTuple, collector); + } + aggregator.complete(state, collector); + + List<List<Object>> resultantAggregatedValue = collector.values; + + ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), + new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); + windowStore.putAll(entries); + + pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); + } + + /** + * Return {@code TridentTuple}s from given {@code tupleEvents}. + * @param tupleEvents + * @return + */ + protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents); + + /** + * This {@code TridentCollector} accumulates all the values emitted. + */ + static class AccumulatedTuplesCollector implements TridentCollector { + + final List<List<Object>> values = new ArrayList<>(); + private final BatchOutputCollector delegateCollector; + + public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) { + this.delegateCollector = delegateCollector; + } + + @Override + public void emit(List<Object> values) { + this.values.add(values); + } + + @Override + public void reportError(Throwable t) { + delegateCollector.reportError(t); + } + + } + + static class TriggerResult { + final int id; + final List<List<Object>> result; + + public TriggerResult(int id, List<List<Object>> result) { + this.id = id; + this.result = result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TriggerResult)) return false; + + TriggerResult that = (TriggerResult) o; + + return id == that.id; + + } + + @Override + public int hashCode() { + return id; + } + + @Override + public String toString() { + return "TriggerResult{" + + "id=" + id + + ", result=" + result + + '}'; + } + } + + public Queue<TriggerResult> getPendingTriggers() { + return pendingTriggers; + } + + public void shutdown() { + try { + log.info("window manager [{}] is being shutdown", windowManager); + windowManager.shutdown(); + } finally { --- End diff -- shouldn't we log an exception why the shutdown didn't happened properly.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---