Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763573 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor<T extends State> extends WindowedBoltExecutor implements IStatefulBolt<T> { + private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); + private final IStatefulWindowedBolt<T> statefulWindowedBolt; + private transient TopologyContext topologyContext; + private transient OutputCollector outputCollector; + private transient WindowState<Tuple> state; + private transient boolean stateInitialized; + private transient boolean prePrepared; + + public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> bolt) { + super(bolt); + statefulWindowedBolt = bolt; + } + + @Override + public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { + List<String> registrations = (List<String>) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); + registrations.add(ConcurrentLinkedQueue.class.getName()); + registrations.add(LinkedList.class.getName()); + registrations.add(AtomicInteger.class.getName()); + registrations.add(EventImpl.class.getName()); + registrations.add(WindowPartition.class.getName()); + registrations.add(DefaultEvictionContext.class.getName()); + topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); + prepare(topoConf, context, collector, + getWindowState(topoConf, context), + getPartitionState(topoConf, context), + getWindowSystemState(topoConf, context)); + } + + // package access for unit tests + void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector, + KeyValueState<Long, WindowPartition<Tuple>> windowState, + KeyValueState<String, Deque<Long>> partitionState, + KeyValueState<String, Optional<?>> windowSystemState) { + init(topoConf, context, collector, windowState, partitionState, windowSystemState); + doPrepare(topoConf, context, new NoAckOutputCollector(collector), state, true); + Map<String, Optional<?>> wstate = new HashMap<>(); + windowSystemState.forEach(s -> wstate.put(s.getKey(), s.getValue())); + restoreState(wstate); + } + + @Override + protected void start() { + if (stateInitialized) { + super.start(); + } else { + LOG.debug("Will invoke start after state is initialized"); + } + } + + @Override + public void execute(Tuple input) { + if (!stateInitialized) { + throw new IllegalStateException("execute invoked before initState with input tuple " + input); + } + super.execute(input); + // StatefulBoltExecutor does the actual ack when the state is saved. + outputCollector.ack(input); + } + + @Override + public void initState(T state) { + if (stateInitialized) { + LOG.warn("State is already initialized. Ignoring initState"); + } else { + statefulWindowedBolt.initState(state); + stateInitialized = true; + start(); + } + } + + @Override + public void prePrepare(long txid) { + if (stateInitialized) { + LOG.debug("Prepare streamState, txid {}", txid); + state.prepareCommit(txid); + prePrepared = true; + } else { + LOG.warn("Cannot prepare before initState"); + } + } + + @Override + public void preCommit(long txid) { + if (prePrepared) { + LOG.debug("Commit streamState, txid {}", txid); + state.commit(txid); + } else { + LOG.debug("Ignoring preCommit and not committing streamState."); + } + } + + @Override + public void preRollback() { + LOG.debug("Rollback streamState, stateInitialized {}", stateInitialized); + state.rollback(); + } + + @Override + protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() { + return new WindowLifecycleListener<Tuple>() { + @Override + public void onExpiry(List<Tuple> events) { + /* + * NO-OP: the events are ack-ed in execute + */ + } + + @Override + public void onActivation(Supplier<Iterator<Tuple>> eventsIt, + Supplier<Iterator<Tuple>> newEventsIt, + Supplier<Iterator<Tuple>> expiredIt, + Long timestamp) { + /* + * Here we don't set the tuples in windowedOutputCollector's context and emit un-anchored. + * The checkpoint tuple will trigger a checkpoint in the receiver with the emitted tuples. + */ + boltExecute(eventsIt, newEventsIt, expiredIt, timestamp); + state.clearIteratorPins(); + } + }; + } + + private void init(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector, + KeyValueState<Long, WindowPartition<Tuple>> windowState, + KeyValueState<String, Deque<Long>> partitionState, + KeyValueState<String, Optional<?>> windowSystemState) { + topologyContext = context; + outputCollector = collector; + state = new WindowState<>(windowState, partitionState, windowSystemState, this::getState, + statefulWindowedBolt.maxEventsInMemory()); + } + + private KeyValueState<Long, WindowPartition<Tuple>> getWindowState(Map<String, Object> topoConf, TopologyContext context) { + String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window"; + return (KeyValueState<Long, WindowPartition<Tuple>>) StateFactory.getState(namespace, topoConf, context); + } + + private KeyValueState<String, Deque<Long>> getPartitionState(Map<String, Object> topoConf, TopologyContext context) { + String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-partitions"; + return (KeyValueState<String, Deque<Long>>) StateFactory.getState(namespace, topoConf, context); + } + + private KeyValueState<String, Optional<?>> getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) { + String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-systemstate"; + return (KeyValueState<String, Optional<?>>) StateFactory.getState(namespace, topoConf, context); + } + + // a wrapper around the window related states that are checkpointed + private static class WindowState<T> extends AbstractCollection<Event<T>> { + // number of events per window-partition + private static final int PARTITION_SZ = 1000; + private static final int MIN_PARTITIONS = 10; + private static final String PARTITION_IDS_KEY = "pk"; + private final KeyValueState<String, Deque<Long>> partitionIds; + private final KeyValueState<Long, WindowPartition<T>> windowPartitions; + private final KeyValueState<String, Optional<?>> windowSystemState; + // ordered partition keys + private Deque<Long> pids; + private volatile long latestPartitionId; + private WindowPartitionCache<Long, WindowPartition<T>> cache; + private Supplier<Map<String, Optional<?>>> windowSystemStateSupplier; + private final ReentrantLock partitionIdsLock = new ReentrantLock(true); + private final ReentrantLock windowPartitionsLock = new ReentrantLock(true); + private final long maxEventsInMemory; + private WindowPartition<T> latestPartition; + private Set<Long> iteratorPins = new HashSet<>(); + + WindowState(KeyValueState<Long, WindowPartition<T>> windowPartitions, + KeyValueState<String, Deque<Long>> partitionIds, + KeyValueState<String, Optional<?>> windowSystemState, + Supplier<Map<String, Optional<?>>> windowSystemStateSupplier, + long maxEventsInMemory) { + this.windowPartitions = windowPartitions; + this.partitionIds = partitionIds; + this.windowSystemState = windowSystemState; + this.windowSystemStateSupplier = windowSystemStateSupplier; + this.maxEventsInMemory = Math.max(PARTITION_SZ * MIN_PARTITIONS, maxEventsInMemory); + initCache(); + initPartitions(); + } + + @Override + public boolean add(Event<T> event) { + if (latestPartition.size() >= PARTITION_SZ) { + cache.unpin(latestPartition.getId()); + latestPartition = getPinnedPartition(getNextPartitionId()); + } + latestPartition.add(event); + return true; + } + + @Override + public Iterator<Event<T>> iterator() { + + return new Iterator<Event<T>>() { + private Iterator<Long> ids = getIds(); + private Iterator<Event<T>> current = emptyIterator(); + private Iterator<Event<T>> removeFrom; + private WindowPartition<T> curPartition; + + private Iterator<Long> getIds() { + try { + partitionIdsLock.lock(); + LOG.debug("Iterator pids: {}", pids); + return new ArrayList<>(pids).iterator(); + } finally { + partitionIdsLock.unlock(); + } + } + + @Override + public void remove() { + if (removeFrom == null) { + throw new IllegalStateException("No calls to next() since last call to remove()"); + } + removeFrom.remove(); + removeFrom = null; + } + + @Override + public boolean hasNext() { + boolean curHasNext = current.hasNext(); + while (!curHasNext && ids.hasNext()) { + if (curPartition != null) { + unpin(curPartition.getId()); + } + curPartition = getPinnedPartition(ids.next()); + iteratorPins.add(curPartition.getId()); + if (curPartition != null) { + current = curPartition.iterator(); + curHasNext = current.hasNext(); + } + } + // un-pin the last partition + if (!curHasNext && curPartition != null) { + unpin(curPartition.getId()); + curPartition = null; + } + return curHasNext; + } + + @Override + public Event<T> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + removeFrom = current; + return current.next(); + } + + private void unpin(long id) { + cache.unpin(id); + iteratorPins.remove(id); + } + }; + } + + void clearIteratorPins() { + LOG.debug("clearIteratorPins '{}'", iteratorPins); + Iterator<Long> it = iteratorPins.iterator(); + while (it.hasNext()) { + cache.unpin(it.next()); + it.remove(); + } + } + + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + void prepareCommit(long txid) { + flush(); + partitionIds.prepareCommit(txid); + windowPartitions.prepareCommit(txid); + windowSystemState.prepareCommit(txid); + } + + void commit(long txid) { + partitionIds.commit(txid); + windowPartitions.commit(txid); + windowSystemState.commit(txid); + } + + void rollback() { + partitionIds.rollback(); + windowPartitions.rollback(); + windowSystemState.rollback(); + } + + private void initPartitions() { + pids = partitionIds.get(PARTITION_IDS_KEY, new LinkedList<>()); + if (pids.isEmpty()) { + pids.add(0L); + partitionIds.put(PARTITION_IDS_KEY, pids); + } else { + latestPartitionId = pids.peekLast(); --- End diff -- yes, it was initialized to 0 being a member variable, anyways I will remove the else to make it clearer.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---