[FLINK-1638] [streaming] Added persistent Kafka source

Exposed state registering in the public API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09aa841d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09aa841d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09aa841d

Branch: refs/heads/master
Commit: 09aa841d6b0b7ee407c8011f35ec7c6a274480a3
Parents: e7485c2
Author: mbalassi <mbala...@apache.org>
Authored: Thu Mar 5 11:13:00 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerExample.java  |  8 +-
 .../connectors/kafka/KafkaProducerExample.java  |  2 +-
 .../connectors/kafka/api/KafkaSource.java       |  2 +-
 .../api/simple/KafkaCustomOffsetSource.java     | 46 -----------
 .../kafka/api/simple/KafkaTopicCreator.java     | 67 ---------------
 .../kafka/api/simple/KafkaTopicFactory.java     | 67 +++++++++++++++
 .../kafka/api/simple/PersistentKafkaSource.java | 87 ++++++++++++++++++++
 .../kafka/api/simple/SimpleKafkaSource.java     | 10 ++-
 .../apache/flink/streaming/api/StreamGraph.java | 10 +--
 9 files changed, 170 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 754b2b3..d9b03c9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.KafkaCustomOffsetSource;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource;
+import 
org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.state.SimpleState;
 
 public class KafkaConsumerExample {
 
@@ -43,7 +42,8 @@ public class KafkaConsumerExample {
                                .addSource(
 //                                             new KafkaSource<String>(host + 
":" + port, topic, new JavaDefaultStringSchema()))
 //                                             new 
SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
-                                               new 
KafkaCustomOffsetSource<String>(topic, host, port, new 
JavaDefaultStringSchema()))
+                                               new 
PersistentKafkaSource<String>(topic, host, port, 10L, new 
JavaDefaultStringSchema()))
+                               .registerState("kafka", new SimpleState<Long>())
                                .setParallelism(3)
                                .print().setParallelism(3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 1cd1192..a17beb8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -42,7 +42,7 @@ public class KafkaProducerExample {
                DataStream<String> stream1 = env.addSource(new 
SourceFunction<String>() {
                        @Override
                        public void run(Collector<String> collector) throws 
Exception {
-                               for (int i = 0; i < 100; i++) {
+                               for (int i = 0; i < 20; i++) {
                                        collector.collect("message #" + i);
                                        Thread.sleep(100L);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 3075608..4349081 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -78,7 +78,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
        }
 
        public KafkaSource(String zookeeperHost, String topicId,
-                                          DeserializationSchema<OUT> 
deserializationSchema, long zookeeperSyncTimeMillis){
+                                               DeserializationSchema<OUT> 
deserializationSchema, long zookeeperSyncTimeMillis){
                this(zookeeperHost, topicId, DEFAULT_GROUP_ID, 
deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
deleted file mode 100644
index d90ff7c..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
-
-public class KafkaCustomOffsetSource<OUT> extends SimpleKafkaSource<OUT> {
-
-       /**
-        * Partition index is set automatically by instance id.
-        *
-        * @param topicId
-        * @param host
-        * @param port
-        * @param deserializationSchema
-        */
-       public KafkaCustomOffsetSource(String topicId, String host, int port, 
DeserializationSchema<OUT> deserializationSchema) {
-               super(topicId, host, port, deserializationSchema);
-       }
-
-       @Override
-       protected void setInitialOffset(Configuration config) {
-               iterator.initializeFromOffset(10);
-       }
-
-       @Override
-       protected void gotMessage(MessageWithOffset msg) {
-               System.out.println(msg.getOffset() + " :: " + 
schema.deserialize(msg.getMessage()));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
deleted file mode 100644
index 9e12492..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import kafka.admin.AdminUtils;
-
-public class KafkaTopicCreator {
-
-       public static void createTopic(String zookeeperServer, String 
topicName, int numOfPartitions, int replicationFactor) {
-               createTopic(zookeeperServer, topicName, numOfPartitions, 
replicationFactor, new Properties(), 10000, 10000);
-       }
-
-       public static void createTopic(String zookeeperServer, String 
topicName, int numOfPartitions, int replicationFactor, Properties 
topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) {
-               ZkClient zkClient = new ZkClient(zookeeperServer, 
sessionTimeoutMs, connectionTimeoutMs,
-                               new KafkaZKStringSerializer());
-
-               Properties topicConfig = new Properties();
-               AdminUtils.createTopic(zkClient, topicName, numOfPartitions, 
replicationFactor, topicConfig);
-       }
-
-       private static class KafkaZKStringSerializer implements ZkSerializer {
-
-               @Override
-               public byte[] serialize(Object data) throws ZkMarshallingError {
-                       try {
-                               return ((String) data).getBytes("UTF-8");
-                       } catch (UnsupportedEncodingException e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-
-               @Override
-               public Object deserialize(byte[] bytes) throws 
ZkMarshallingError {
-                       if (bytes == null) {
-                               return null;
-                       } else {
-                               try {
-                                       return new String(bytes, "UTF-8");
-                               } catch (UnsupportedEncodingException e) {
-                                       throw new RuntimeException(e);
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
new file mode 100644
index 0000000..f949b9a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import kafka.admin.AdminUtils;
+
+public class KafkaTopicFactory {
+
+       public static void createTopic(String zookeeperServer, String 
topicName, int numOfPartitions, int replicationFactor) {
+               createTopic(zookeeperServer, topicName, numOfPartitions, 
replicationFactor, new Properties(), 10000, 10000);
+       }
+
+       public static void createTopic(String zookeeperServer, String 
topicName, int numOfPartitions, int replicationFactor, Properties 
topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) {
+               ZkClient zkClient = new ZkClient(zookeeperServer, 
sessionTimeoutMs, connectionTimeoutMs,
+                               new KafkaZKStringSerializer());
+
+               Properties topicConfig = new Properties();
+               AdminUtils.createTopic(zkClient, topicName, numOfPartitions, 
replicationFactor, topicConfig);
+       }
+
+       private static class KafkaZKStringSerializer implements ZkSerializer {
+
+               @Override
+               public byte[] serialize(Object data) throws ZkMarshallingError {
+                       try {
+                               return ((String) data).getBytes("UTF-8");
+                       } catch (UnsupportedEncodingException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public Object deserialize(byte[] bytes) throws 
ZkMarshallingError {
+                       if (bytes == null) {
+                               return null;
+                       } else {
+                               try {
+                                       return new String(bytes, "UTF-8");
+                               } catch (UnsupportedEncodingException e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
new file mode 100644
index 0000000..00d003a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.state.SimpleState;
+import org.apache.flink.util.Collector;
+
+public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
+
+       private static final long NUM_RECORDS_PER_CHECKPOINT = 1000;
+
+       private long initialOffset;
+
+       private transient SimpleState<Long> kafkaOffSet;
+       private transient long checkpointCounter;
+
+       /**
+        * Partition index is set automatically by instance id.
+        *
+        * @param topicId
+        * @param host
+        * @param port
+        * @param deserializationSchema
+        */
+       public PersistentKafkaSource(String topicId, String host, int port, 
long initialOffset, DeserializationSchema<OUT> deserializationSchema) {
+               super(topicId, host, port, deserializationSchema);
+               this.initialOffset = initialOffset;
+       }
+
+       @Override
+       public void open(Configuration parameters) {
+               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
+               SimpleState<Long> lastKafkaOffSet = (SimpleState<Long>) 
context.getState("kafka");
+
+               if (lastKafkaOffSet.getState() == null){
+                       kafkaOffSet = new SimpleState<Long>(initialOffset);
+               } else {
+                       kafkaOffSet = lastKafkaOffSet;
+               }
+
+               checkpointCounter = 0;
+               super.open(parameters);
+       }
+
+       @Override
+       protected void setInitialOffset(Configuration config) {
+               iterator.initializeFromOffset(kafkaOffSet.getState());
+       }
+
+       @Override
+       protected void gotMessage(MessageWithOffset msg) {
+               System.out.println(msg.getOffset() + " :: " + 
schema.deserialize(msg.getMessage()));
+       }
+
+       @Override
+       public void run(Collector<OUT> collector) throws Exception {
+               MessageWithOffset msg;
+               while (iterator.hasNext()) {
+                       msg = iterator.nextWithOffset();
+                       gotMessage(msg);
+                       OUT out = schema.deserialize(msg.getMessage());
+                       collector.collect(out);
+                       if (checkpointCounter > NUM_RECORDS_PER_CHECKPOINT){
+                               kafkaOffSet = new 
SimpleState<Long>(msg.getOffset());
+                               kafkaOffSet.checkpoint();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index a721dee..473585c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -38,8 +38,7 @@ public class SimpleKafkaSource<OUT> extends 
ConnectorSource<OUT> {
         * @param port
         * @param deserializationSchema
         */
-       public SimpleKafkaSource(String topicId,
-                                                        String host, int port, 
DeserializationSchema<OUT> deserializationSchema) {
+       public SimpleKafkaSource(String topicId, String host, int port, 
DeserializationSchema<OUT> deserializationSchema) {
                super(deserializationSchema);
                this.topicId = topicId;
                this.host = host;
@@ -55,12 +54,13 @@ public class SimpleKafkaSource<OUT> extends 
ConnectorSource<OUT> {
                iterator.initializeFromCurrent();
        }
 
+       //This just for debug purposes
        protected void gotMessage(MessageWithOffset msg) {
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       public void invoke(Collector<OUT> collector) throws Exception {
+       public void run(Collector<OUT> collector) throws Exception {
                while (iterator.hasNext()) {
                        MessageWithOffset msg = iterator.nextWithOffset();
                        gotMessage(msg);
@@ -69,6 +69,10 @@ public class SimpleKafkaSource<OUT> extends 
ConnectorSource<OUT> {
                }
        }
 
+       @Override
+       public void cancel() {
+       }
+
 
        @Override
        public void open(Configuration config) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 640416d..641708e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -245,8 +245,6 @@ public class StreamGraph extends StreamingPlan {
         *            Id of the iteration tail
         * @param iterationID
         *            ID of iteration for mulitple iterations
-        * @param parallelism
-        *            Number of parallel instances created
         * @param waitTime
         *            Max waiting time for next record
         */
@@ -297,8 +295,6 @@ public class StreamGraph extends StreamingPlan {
         *            Name of the vertex
         * @param vertexClass
         *            The class of the vertex
-        * @param invokableObjectject
-        *            The user defined invokable object
         * @param operatorName
         *            Type of the user defined operator
         * @param parallelism
@@ -419,8 +415,8 @@ public class StreamGraph extends StreamingPlan {
                return this.bufferTimeouts.get(vertexID);
        }
 
-       public void addOperatorState(Integer veretxName, String stateName, 
OperatorState<?> state) {
-               Map<String, OperatorState<?>> states = 
operatorStates.get(veretxName);
+       public void addOperatorState(Integer vertexName, String stateName, 
OperatorState<?> state) {
+               Map<String, OperatorState<?>> states = 
operatorStates.get(vertexName);
                if (states == null) {
                        states = new HashMap<String, OperatorState<?>>();
                        states.put(stateName, state);
@@ -432,7 +428,7 @@ public class StreamGraph extends StreamingPlan {
                                states.put(stateName, state);
                        }
                }
-               operatorStates.put(veretxName, states);
+               operatorStates.put(vertexName, states);
        }
 
        /**

Reply via email to