[
https://issues.apache.org/jira/browse/STORM-1176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15094677#comment-15094677
]
ASF GitHub Bot commented on STORM-1176:
---------------------------------------
Github user ptgoetz commented on a diff in the pull request:
https://github.com/apache/storm/pull/963#discussion_r49506460
--- Diff:
examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java ---
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.util.Map;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * A simple example that demonstrates the usage of {@link
org.apache.storm.topology.IStatefulWindowedBolt} to
+ * save the state of the windowing operation to avoid re-computation in
case of failures.
+ * <p>
+ * The framework internally manages the window boundaries and does not
invoke
+ * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)}
for the already evaluated windows in case of restarts
+ * during failures. The {@link
org.apache.storm.topology.IStatefulBolt#initState(State)}
+ * is invoked with the previously saved state of the bolt after prepare,
before the execute() method is invoked.
+ * </p>
+ */
+public class StatefulWindowingTopology {
+
+ private static class WindowSumBolt extends
BaseStatefulWindowedBolt<KeyValueState<String, Long>> {
+ private KeyValueState<String, Long> state;
+ private long sum;
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void initState(KeyValueState<String, Long> state) {
+ this.state = state;
+ sum = state.get("sum", 0L);
+ System.out.println("initState with state [" + state + "]
current sum [" + sum + "]");
--- End diff --
Could we use slf4j here instead of `System.out`?
> Checkpoint window evaluated state and use this to prune duplicate evaluations
> -----------------------------------------------------------------------------
>
> Key: STORM-1176
> URL: https://issues.apache.org/jira/browse/STORM-1176
> Project: Apache Storm
> Issue Type: Sub-task
> Reporter: Arun Mahadevan
> Assignee: Arun Mahadevan
>
> Evaluated state of sliding/tumbling windows should be checkpointed
> periodically and on event replay during restart this info should be used to
> prune duplicate evaluations.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)