[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192124#comment-15192124 ]
ASF GitHub Bot commented on STORM-676: -------------------------------------- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55932046 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java --- @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.collect.Lists; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerPolicy; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.apache.storm.windowing.WindowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing + * tuples and triggers related information. + * + */ +public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager { + private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class); + + protected final WindowManager<T> windowManager; + protected final Aggregator aggregator; + protected final BatchOutputCollector delegateCollector; + protected final String windowTaskId; + protected final WindowsStore windowStore; + + protected final Set<String> activeBatches = new HashSet<>(); + protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>(); + protected final AtomicInteger triggerId = new AtomicInteger(); + private final String windowTriggerCountId; + private final TriggerPolicy<T> triggerPolicy; + + public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, + Aggregator aggregator, BatchOutputCollector delegateCollector) { + this.windowTaskId = windowTaskId; + this.windowStore = windowStore; + this.aggregator = aggregator; + this.delegateCollector = delegateCollector; + + windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId; + + windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); + + WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig); + EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy(); + windowManager.setEvictionPolicy(evictionPolicy); + triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); + windowManager.setTriggerPolicy(triggerPolicy); + } + + @Override + public void prepare() { + preInitialize(); + + initialize(); + + postInitialize(); + } + + private void preInitialize() { + log.debug("Getting current trigger count for this component/task"); + // get trigger count value from store + Object result = windowStore.get(windowTriggerCountId); + Integer currentCount = 0; + if(result == null) { + log.info("No current trigger count in windows store."); + } else { + currentCount = (Integer) result + 1; + } + windowStore.put(windowTriggerCountId, currentCount); + triggerId.set(currentCount); + } + + private void postInitialize() { + // start trigger once the initialization is done. + triggerPolicy.start(); + } + + /** + * Load and initialize any resources into window manager before windowing for component/task is activated. + */ + protected abstract void initialize(); + + /** + * Listener to reeive any activation/expiry of windowing events and take further action on them. + */ + class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> { + + @Override + public void onExpiry(List<T> expiredEvents) { + log.debug("onExpiry is invoked"); + onTuplesExpired(expiredEvents); + } + + @Override + public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { + log.debug("onActivation is invoked with events size: {}", events.size()); + // trigger occurred, create an aggregation and keep them in store + int currentTriggerId = triggerId.incrementAndGet(); + execAggregatorAndStoreResult(currentTriggerId, events); + } + } + + /** + * Handle expired tuple events which can be removing from cache or store. + * + * @param expiredEvents + */ + protected abstract void onTuplesExpired(List<T> expiredEvents); + + private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) { + List<TridentTuple> resultTuples = getTridentTuples(tupleEvents); + + // run aggregator to compute the result + AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); + Object state = aggregator.init(currentTriggerId, collector); + for (TridentTuple resultTuple : resultTuples) { + aggregator.aggregate(state, resultTuple, collector); + } + aggregator.complete(state, collector); + + List<List<Object>> resultantAggregatedValue = collector.values; + + ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), + new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); + windowStore.putAll(entries); + + pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); + } + + /** + * Return {@code TridentTuple}s from given {@code tupleEvents}. + * @param tupleEvents + * @return + */ + protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents); + + /** + * This {@code TridentCollector} accumulates all the values emitted. + */ + static class AccumulatedTuplesCollector implements TridentCollector { + + final List<List<Object>> values = new ArrayList<>(); + private final BatchOutputCollector delegateCollector; + + public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) { + this.delegateCollector = delegateCollector; + } + + @Override + public void emit(List<Object> values) { + this.values.add(values); + } + + @Override + public void reportError(Throwable t) { + delegateCollector.reportError(t); + } + + } + + static class TriggerResult { + final int id; + final List<List<Object>> result; + + public TriggerResult(int id, List<List<Object>> result) { + this.id = id; + this.result = result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TriggerResult)) return false; + + TriggerResult that = (TriggerResult) o; + + return id == that.id; + + } + + @Override + public int hashCode() { + return id; + } + + @Override + public String toString() { + return "TriggerResult{" + + "id=" + id + + ", result=" + result + + '}'; + } + } + + public Queue<TriggerResult> getPendingTriggers() { + return pendingTriggers; + } + + public void shutdown() { + try { + log.info("window manager [{}] is being shutdown", windowManager); + windowManager.shutdown(); + } finally { --- End diff -- I will add logging an exception in WindowTridentProcessor from which this method is invoked. > Storm Trident support for sliding/tumbling windows > -------------------------------------------------- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Sriharsha Chintalapani > Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)