Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1072#discussion_r55930526
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
 ---
    @@ -0,0 +1,241 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.trident.windowing;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.storm.coordination.BatchOutputCollector;
    +import org.apache.storm.trident.operation.Aggregator;
    +import org.apache.storm.trident.operation.TridentCollector;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +import org.apache.storm.trident.windowing.config.WindowConfig;
    +import org.apache.storm.trident.windowing.strategy.WindowStrategy;
    +import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
    +import org.apache.storm.windowing.EvictionPolicy;
    +import org.apache.storm.windowing.TriggerPolicy;
    +import org.apache.storm.windowing.WindowLifecycleListener;
    +import org.apache.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Basic functionality to manage trident tuple events using {@code 
WindowManager} and {@code WindowsStore} for storing
    + * tuples and triggers related information.
    + *
    + */
    +public abstract class AbstractTridentWindowManager<T> implements 
ITridentWindowManager {
    +    private static final Logger log = 
LoggerFactory.getLogger(AbstractTridentWindowManager.class);
    +
    +    protected final WindowManager<T> windowManager;
    +    protected final Aggregator aggregator;
    +    protected final BatchOutputCollector delegateCollector;
    +    protected final String windowTaskId;
    +    protected final WindowsStore windowStore;
    +
    +    protected final Set<String> activeBatches = new HashSet<>();
    +    protected final Queue<TriggerResult> pendingTriggers = new 
ConcurrentLinkedQueue<>();
    +    protected final AtomicInteger triggerId = new AtomicInteger();
    +    private final String windowTriggerCountId;
    +    private final TriggerPolicy<T> triggerPolicy;
    +
    +    public AbstractTridentWindowManager(WindowConfig windowConfig, String 
windowTaskId, WindowsStore windowStore,
    +                                        Aggregator aggregator, 
BatchOutputCollector delegateCollector) {
    +        this.windowTaskId = windowTaskId;
    +        this.windowStore = windowStore;
    +        this.aggregator = aggregator;
    +        this.delegateCollector = delegateCollector;
    +
    +        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX 
+ windowTaskId;
    +
    +        windowManager = new WindowManager<>(new 
TridentWindowLifeCycleListener());
    +
    +        WindowStrategy<T> windowStrategy = 
WindowStrategyFactory.create(windowConfig);
    +        EvictionPolicy<T> evictionPolicy = 
windowStrategy.getEvictionPolicy();
    +        windowManager.setEvictionPolicy(evictionPolicy);
    +        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, 
evictionPolicy);
    +        windowManager.setTriggerPolicy(triggerPolicy);
    +    }
    +
    +    @Override
    +    public void prepare() {
    +        preInitialize();
    +
    +        initialize();
    +
    +        postInitialize();
    +    }
    +
    +    private void preInitialize() {
    +        log.debug("Getting current trigger count for this component/task");
    +        // get trigger count value from store
    +        Object result = windowStore.get(windowTriggerCountId);
    +        Integer currentCount = 0;
    +        if(result == null) {
    +            log.info("No current trigger count in windows store.");
    +        } else {
    +            currentCount = (Integer) result + 1;
    +        }
    +        windowStore.put(windowTriggerCountId, currentCount);
    +        triggerId.set(currentCount);
    +    }
    +
    +    private void postInitialize() {
    +        // start trigger once the initialization is done.
    +        triggerPolicy.start();
    +    }
    +
    +    /**
    +     * Load and initialize any resources into window manager before 
windowing for component/task is activated.
    +     */
    +    protected abstract void initialize();
    +
    +    /**
    +     * Listener to reeive any activation/expiry of windowing events and 
take further action on them.
    +     */
    +    class TridentWindowLifeCycleListener implements 
WindowLifecycleListener<T> {
    +
    +        @Override
    +        public void onExpiry(List<T> expiredEvents) {
    +            log.debug("onExpiry is invoked");
    +            onTuplesExpired(expiredEvents);
    +        }
    +
    +        @Override
    +        public void onActivation(List<T> events, List<T> newEvents, 
List<T> expired) {
    +            log.debug("onActivation is invoked with events size: {}", 
events.size());
    +            // trigger occurred, create an aggregation and keep them in 
store
    +            int currentTriggerId = triggerId.incrementAndGet();
    +            execAggregatorAndStoreResult(currentTriggerId, events);
    +        }
    +    }
    +
    +    /**
    +     * Handle expired tuple events which can be removing from cache or 
store.
    +     *
    +     * @param expiredEvents
    +     */
    +    protected abstract void onTuplesExpired(List<T> expiredEvents);
    +
    +    private void execAggregatorAndStoreResult(int currentTriggerId, 
List<T> tupleEvents) {
    +        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
    +
    +        // run aggregator to compute the result
    +        AccumulatedTuplesCollector collector = new 
AccumulatedTuplesCollector(delegateCollector);
    +        Object state = aggregator.init(currentTriggerId, collector);
    +        for (TridentTuple resultTuple : resultTuples) {
    +            aggregator.aggregate(state, resultTuple, collector);
    +        }
    +        aggregator.complete(state, collector);
    +
    +        List<List<Object>> resultantAggregatedValue = collector.values;
    +
    +        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new 
WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
    +                new 
WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId,
 currentTriggerId), resultantAggregatedValue));
    +        windowStore.putAll(entries);
    +
    +        pendingTriggers.add(new TriggerResult(currentTriggerId, 
resultantAggregatedValue));
    +    }
    +
    +    /**
    +     * Return {@code TridentTuple}s from given {@code tupleEvents}.
    +     * @param tupleEvents
    +     * @return
    +     */
    +    protected abstract List<TridentTuple> getTridentTuples(List<T> 
tupleEvents);
    +
    +    /**
    +     * This {@code TridentCollector} accumulates all the values emitted.
    +     */
    +    static class AccumulatedTuplesCollector implements TridentCollector {
    +
    +        final List<List<Object>> values = new ArrayList<>();
    +        private final BatchOutputCollector delegateCollector;
    +
    +        public AccumulatedTuplesCollector(BatchOutputCollector 
delegateCollector) {
    +            this.delegateCollector = delegateCollector;
    +        }
    +
    +        @Override
    +        public void emit(List<Object> values) {
    +            this.values.add(values);
    +        }
    +
    +        @Override
    +        public void reportError(Throwable t) {
    +            delegateCollector.reportError(t);
    +        }
    +
    +    }
    +
    +    static class TriggerResult {
    +        final int id;
    +        final List<List<Object>> result;
    +
    +        public TriggerResult(int id, List<List<Object>> result) {
    +            this.id = id;
    +            this.result = result;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (!(o instanceof TriggerResult)) return false;
    +
    +            TriggerResult that = (TriggerResult) o;
    +
    +            return id == that.id;
    +
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            return id;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "TriggerResult{" +
    +                    "id=" + id +
    +                    ", result=" + result +
    +                    '}';
    +        }
    +    }
    +
    +    public Queue<TriggerResult> getPendingTriggers() {
    +        return pendingTriggers;
    +    }
    +
    +    public void shutdown() {
    +        try {
    +            log.info("window manager [{}] is being shutdown", 
windowManager);
    +            windowManager.shutdown();
    +        } finally {
    --- End diff --
    
    shouldn't we log an exception why the shutdown didn't happened properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to