Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2218#discussion_r129791075
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
    @@ -0,0 +1,596 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.topology;
    +
    +import java.util.AbstractCollection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Deque;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Supplier;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.State;
    +import org.apache.storm.state.StateFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.DefaultEvictionContext;
    +import org.apache.storm.windowing.Event;
    +import org.apache.storm.windowing.EventImpl;
    +import org.apache.storm.windowing.WindowLifecycleListener;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static java.util.Collections.emptyIterator;
    +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
    +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
    +import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
    +
    +/**
    + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
    + * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
    + * by transparently caching the window partitions and checkpointing them 
as needed.
    + */
    +public class PersistentWindowedBoltExecutor<T extends State> extends 
WindowedBoltExecutor implements IStatefulBolt<T> {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
    +    private final IStatefulWindowedBolt<T> statefulWindowedBolt;
    +    private transient TopologyContext topologyContext;
    +    private transient OutputCollector outputCollector;
    +    private transient WindowState<Tuple> state;
    +    private transient boolean stateInitialized;
    +    private transient boolean prePrepared;
    +
    +    public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> bolt) {
    +        super(bolt);
    +        statefulWindowedBolt = bolt;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> topoConf, TopologyContext 
context, OutputCollector collector) {
    +        List<String> registrations = (List<String>) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
    +        registrations.add(ConcurrentLinkedQueue.class.getName());
    +        registrations.add(LinkedList.class.getName());
    +        registrations.add(AtomicInteger.class.getName());
    +        registrations.add(EventImpl.class.getName());
    +        registrations.add(WindowPartition.class.getName());
    +        registrations.add(DefaultEvictionContext.class.getName());
    +        topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
    +        prepare(topoConf, context, collector,
    +            getWindowState(topoConf, context),
    +            getPartitionState(topoConf, context),
    +            getWindowSystemState(topoConf, context));
    +    }
    +
    +    @Override
    +    protected void validate(Map<String, Object> topoConf,
    +                            BaseWindowedBolt.Count windowLengthCount,
    +                            BaseWindowedBolt.Duration windowLengthDuration,
    +                            BaseWindowedBolt.Count slidingIntervalCount,
    +                            BaseWindowedBolt.Duration 
slidingIntervalDuration) {
    +        if (windowLengthCount == null && windowLengthDuration == null) {
    +            throw new IllegalArgumentException("Window length is not 
specified");
    +        }
    +        int interval = getCheckpointIntervalMillis(topoConf);
    +        int timeout = getTopologyTimeoutMillis(topoConf);
    +        if (interval > timeout) {
    +            throw new 
IllegalArgumentException(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL + interval +
    +                " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                " value " + timeout);
    +        }
    +    }
    +
    +    private int getCheckpointIntervalMillis(Map<String, Object> topoConf) {
    +        int checkpointInterval = Integer.MAX_VALUE;
    +        if (topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL) != 
null) {
    +            checkpointInterval = ((Number) 
topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
    +        }
    +        return checkpointInterval;
    +    }
    +
    +    // package access for unit tests
    +    void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector,
    +                 KeyValueState<Long, WindowPartition<Tuple>> windowState,
    +                 KeyValueState<String, Deque<Long>> partitionState,
    +                 KeyValueState<String, Optional<?>> windowSystemState) {
    +        init(topoConf, context, collector, windowState, partitionState, 
windowSystemState);
    +        doPrepare(topoConf, context, new NoAckOutputCollector(collector), 
state, true);
    +        Map<String, Optional<?>> wstate = new HashMap<>();
    +        windowSystemState.forEach(s -> wstate.put(s.getKey(), 
s.getValue()));
    +        restoreState(wstate);
    +    }
    +
    +    @Override
    +    protected void start() {
    +        if (stateInitialized) {
    +            super.start();
    +        } else {
    +            LOG.debug("Will invoke start after state is initialized");
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        if (!stateInitialized) {
    +            throw new IllegalStateException("execute invoked before 
initState with input tuple " + input);
    +        }
    +        super.execute(input);
    +        // StatefulBoltExecutor does the actual ack when the state is 
saved.
    +        outputCollector.ack(input);
    +    }
    +
    +    @Override
    +    public void initState(T state) {
    +        if (stateInitialized) {
    +            String msg = "initState invoked when the state is already 
initialized";
    +            LOG.warn(msg);
    +            throw new IllegalStateException(msg);
    +        } else {
    +            statefulWindowedBolt.initState(state);
    +            stateInitialized = true;
    +            start();
    +        }
    +    }
    +
    +    @Override
    +    public void prePrepare(long txid) {
    +        if (stateInitialized) {
    +            LOG.debug("Prepare streamState, txid {}", txid);
    +            state.prepareCommit(txid);
    +            prePrepared = true;
    +        } else {
    +            String msg = "Cannot prepare before initState";
    +            LOG.warn(msg);
    +            throw new IllegalStateException(msg);
    +        }
    +    }
    +
    +    @Override
    +    public void preCommit(long txid) {
    +        // preCommit can be invoked during recovery before the state is 
initialized
    +        if (prePrepared || !stateInitialized) {
    +            LOG.debug("Commit streamState, txid {}", txid);
    +            state.commit(txid);
    +        } else {
    +            String msg = "preCommit before prePrepare in initialized 
state";
    +            LOG.warn(msg);
    +            throw new IllegalStateException(msg);
    +        }
    +    }
    +
    +    @Override
    +    public void preRollback() {
    +        LOG.debug("Rollback streamState, stateInitialized {}", 
stateInitialized);
    +        state.rollback();
    +    }
    +
    +    @Override
    +    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
    +        return new WindowLifecycleListener<Tuple>() {
    +            @Override
    +            public void onExpiry(List<Tuple> events) {
    +                /*
    +                 * NO-OP: the events are ack-ed in execute
    +                 */
    +            }
    +
    +            @Override
    +            public void onActivation(Supplier<Iterator<Tuple>> eventsIt,
    +                                     Supplier<Iterator<Tuple>> newEventsIt,
    +                                     Supplier<Iterator<Tuple>> expiredIt,
    +                                     Long timestamp) {
    +                /*
    +                 * Here we don't set the tuples in 
windowedOutputCollector's context and emit un-anchored.
    +                 * The checkpoint tuple will trigger a checkpoint in the 
receiver with the emitted tuples.
    +                 */
    +                boltExecute(eventsIt, newEventsIt, expiredIt, timestamp);
    +                state.clearIteratorPins();
    +            }
    +        };
    +    }
    +
    +    private void init(Map<String, Object> topoConf, TopologyContext 
context, OutputCollector collector,
    +                      KeyValueState<Long, WindowPartition<Tuple>> 
windowState,
    +                      KeyValueState<String, Deque<Long>> partitionState,
    +                      KeyValueState<String, Optional<?>> 
windowSystemState) {
    +        topologyContext = context;
    +        outputCollector = collector;
    +        state = new WindowState<>(windowState, partitionState, 
windowSystemState, this::getState,
    +            statefulWindowedBolt.maxEventsInMemory());
    +    }
    +
    +    private KeyValueState<Long, WindowPartition<Tuple>> 
getWindowState(Map<String, Object> topoConf, TopologyContext context) {
    +        String namespace = context.getThisComponentId() + "-" + 
context.getThisTaskId() + "-window";
    +        return (KeyValueState<Long, WindowPartition<Tuple>>) 
StateFactory.getState(namespace, topoConf, context);
    +    }
    +
    +    private KeyValueState<String, Deque<Long>> 
getPartitionState(Map<String, Object> topoConf, TopologyContext context) {
    +        String namespace = context.getThisComponentId() + "-" + 
context.getThisTaskId() + "-window-partitions";
    +        return (KeyValueState<String, Deque<Long>>) 
StateFactory.getState(namespace, topoConf, context);
    +    }
    +
    +    private KeyValueState<String, Optional<?>> 
getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) {
    +        String namespace = context.getThisComponentId() + "-" + 
context.getThisTaskId() + "-window-systemstate";
    +        return (KeyValueState<String, Optional<?>>) 
StateFactory.getState(namespace, topoConf, context);
    +    }
    +
    +    // a wrapper around the window related states that are checkpointed
    +    private static class WindowState<T> extends 
AbstractCollection<Event<T>> {
    --- End diff --
    
    Can we move this class out to package level(may be a new package) and add 
unit tests for this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to