[FLINK-1638] [streaming] Added persistent Kafka source Exposed state registering in the public API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09aa841d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09aa841d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09aa841d Branch: refs/heads/master Commit: 09aa841d6b0b7ee407c8011f35ec7c6a274480a3 Parents: e7485c2 Author: mbalassi <mbala...@apache.org> Authored: Thu Mar 5 11:13:00 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../connectors/kafka/KafkaConsumerExample.java | 8 +- .../connectors/kafka/KafkaProducerExample.java | 2 +- .../connectors/kafka/api/KafkaSource.java | 2 +- .../api/simple/KafkaCustomOffsetSource.java | 46 ----------- .../kafka/api/simple/KafkaTopicCreator.java | 67 --------------- .../kafka/api/simple/KafkaTopicFactory.java | 67 +++++++++++++++ .../kafka/api/simple/PersistentKafkaSource.java | 87 ++++++++++++++++++++ .../kafka/api/simple/SimpleKafkaSource.java | 10 ++- .../apache/flink/streaming/api/StreamGraph.java | 10 +-- 9 files changed, 170 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index 754b2b3..d9b03c9 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaCustomOffsetSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource; +import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; +import org.apache.flink.streaming.state.SimpleState; public class KafkaConsumerExample { @@ -43,7 +42,8 @@ public class KafkaConsumerExample { .addSource( // new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())) // new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema())) - new KafkaCustomOffsetSource<String>(topic, host, port, new JavaDefaultStringSchema())) + new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema())) + .registerState("kafka", new SimpleState<Long>()) .setParallelism(3) .print().setParallelism(3); http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java index 1cd1192..a17beb8 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java @@ -42,7 +42,7 @@ public class KafkaProducerExample { DataStream<String> stream1 = env.addSource(new SourceFunction<String>() { @Override public void run(Collector<String> collector) throws Exception { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20; i++) { collector.collect("message #" + i); Thread.sleep(100L); } http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 3075608..4349081 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -78,7 +78,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> { } public KafkaSource(String zookeeperHost, String topicId, - DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){ + DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){ this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME); } http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java deleted file mode 100644 index d90ff7c..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.util.DeserializationSchema; - -public class KafkaCustomOffsetSource<OUT> extends SimpleKafkaSource<OUT> { - - /** - * Partition index is set automatically by instance id. - * - * @param topicId - * @param host - * @param port - * @param deserializationSchema - */ - public KafkaCustomOffsetSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) { - super(topicId, host, port, deserializationSchema); - } - - @Override - protected void setInitialOffset(Configuration config) { - iterator.initializeFromOffset(10); - } - - @Override - protected void gotMessage(MessageWithOffset msg) { - System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage())); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java deleted file mode 100644 index 9e12492..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple; - -import java.io.UnsupportedEncodingException; -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkMarshallingError; -import org.I0Itec.zkclient.serialize.ZkSerializer; - -import kafka.admin.AdminUtils; - -public class KafkaTopicCreator { - - public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) { - createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000); - } - - public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) { - ZkClient zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs, - new KafkaZKStringSerializer()); - - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); - } - - private static class KafkaZKStringSerializer implements ZkSerializer { - - @Override - public byte[] serialize(Object data) throws ZkMarshallingError { - try { - return ((String) data).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - if (bytes == null) { - return null; - } else { - try { - return new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java new file mode 100644 index 0000000..f949b9a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import java.io.UnsupportedEncodingException; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import kafka.admin.AdminUtils; + +public class KafkaTopicFactory { + + public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) { + createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000); + } + + public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) { + ZkClient zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs, + new KafkaZKStringSerializer()); + + Properties topicConfig = new Properties(); + AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); + } + + private static class KafkaZKStringSerializer implements ZkSerializer { + + @Override + public byte[] serialize(Object data) throws ZkMarshallingError { + try { + return ((String) data).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + if (bytes == null) { + return null; + } else { + try { + return new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java new file mode 100644 index 0000000..00d003a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; +import org.apache.flink.streaming.state.SimpleState; +import org.apache.flink.util.Collector; + +public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> { + + private static final long NUM_RECORDS_PER_CHECKPOINT = 1000; + + private long initialOffset; + + private transient SimpleState<Long> kafkaOffSet; + private transient long checkpointCounter; + + /** + * Partition index is set automatically by instance id. + * + * @param topicId + * @param host + * @param port + * @param deserializationSchema + */ + public PersistentKafkaSource(String topicId, String host, int port, long initialOffset, DeserializationSchema<OUT> deserializationSchema) { + super(topicId, host, port, deserializationSchema); + this.initialOffset = initialOffset; + } + + @Override + public void open(Configuration parameters) { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + SimpleState<Long> lastKafkaOffSet = (SimpleState<Long>) context.getState("kafka"); + + if (lastKafkaOffSet.getState() == null){ + kafkaOffSet = new SimpleState<Long>(initialOffset); + } else { + kafkaOffSet = lastKafkaOffSet; + } + + checkpointCounter = 0; + super.open(parameters); + } + + @Override + protected void setInitialOffset(Configuration config) { + iterator.initializeFromOffset(kafkaOffSet.getState()); + } + + @Override + protected void gotMessage(MessageWithOffset msg) { + System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage())); + } + + @Override + public void run(Collector<OUT> collector) throws Exception { + MessageWithOffset msg; + while (iterator.hasNext()) { + msg = iterator.nextWithOffset(); + gotMessage(msg); + OUT out = schema.deserialize(msg.getMessage()); + collector.collect(out); + if (checkpointCounter > NUM_RECORDS_PER_CHECKPOINT){ + kafkaOffSet = new SimpleState<Long>(msg.getOffset()); + kafkaOffSet.checkpoint(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java index a721dee..473585c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java @@ -38,8 +38,7 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> { * @param port * @param deserializationSchema */ - public SimpleKafkaSource(String topicId, - String host, int port, DeserializationSchema<OUT> deserializationSchema) { + public SimpleKafkaSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) { super(deserializationSchema); this.topicId = topicId; this.host = host; @@ -55,12 +54,13 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> { iterator.initializeFromCurrent(); } + //This just for debug purposes protected void gotMessage(MessageWithOffset msg) { } @SuppressWarnings("unchecked") @Override - public void invoke(Collector<OUT> collector) throws Exception { + public void run(Collector<OUT> collector) throws Exception { while (iterator.hasNext()) { MessageWithOffset msg = iterator.nextWithOffset(); gotMessage(msg); @@ -69,6 +69,10 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> { } } + @Override + public void cancel() { + } + @Override public void open(Configuration config) { http://git-wip-us.apache.org/repos/asf/flink/blob/09aa841d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 640416d..641708e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -245,8 +245,6 @@ public class StreamGraph extends StreamingPlan { * Id of the iteration tail * @param iterationID * ID of iteration for mulitple iterations - * @param parallelism - * Number of parallel instances created * @param waitTime * Max waiting time for next record */ @@ -297,8 +295,6 @@ public class StreamGraph extends StreamingPlan { * Name of the vertex * @param vertexClass * The class of the vertex - * @param invokableObjectject - * The user defined invokable object * @param operatorName * Type of the user defined operator * @param parallelism @@ -419,8 +415,8 @@ public class StreamGraph extends StreamingPlan { return this.bufferTimeouts.get(vertexID); } - public void addOperatorState(Integer veretxName, String stateName, OperatorState<?> state) { - Map<String, OperatorState<?>> states = operatorStates.get(veretxName); + public void addOperatorState(Integer vertexName, String stateName, OperatorState<?> state) { + Map<String, OperatorState<?>> states = operatorStates.get(vertexName); if (states == null) { states = new HashMap<String, OperatorState<?>>(); states.put(stateName, state); @@ -432,7 +428,7 @@ public class StreamGraph extends StreamingPlan { states.put(stateName, state); } } - operatorStates.put(veretxName, states); + operatorStates.put(vertexName, states); } /**