[streaming] Updated connector type handling to suport generic classes by 
GenericSourceFunction interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3f1af0e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3f1af0e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3f1af0e3

Branch: refs/heads/master
Commit: 3f1af0e38836b83cc50ab3ad895841138ca7f5d5
Parents: 2425885
Author: Gyula Fora <[email protected]>
Authored: Thu Dec 25 18:00:26 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Sat Jan 3 21:44:42 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/ConnectorSource.java   | 42 ++++++++++++++++++++
 .../streaming/connectors/flume/FlumeSink.java   | 20 +++++-----
 .../streaming/connectors/flume/FlumeSource.java | 15 ++++---
 .../connectors/flume/FlumeTopology.java         |  8 ++--
 .../streaming/connectors/kafka/KafkaSink.java   |  8 ++--
 .../streaming/connectors/kafka/KafkaSource.java | 15 ++++---
 .../connectors/kafka/KafkaTopology.java         | 21 ++--------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 10 ++---
 .../connectors/rabbitmq/RMQSource.java          | 16 ++++----
 .../connectors/rabbitmq/RMQTopology.java        |  8 ++--
 .../connectors/util/DeserializationSchema.java  | 42 ++++++++++++++++++++
 .../connectors/util/DeserializationScheme.java  | 42 --------------------
 .../streaming/connectors/util/RawSchema.java    | 39 ++++++++++++++++++
 .../streaming/connectors/util/RawScheme.java    | 39 ------------------
 .../connectors/util/SerializationSchema.java    | 33 +++++++++++++++
 .../connectors/util/SerializationScheme.java    | 33 ---------------
 .../connectors/util/SimpleStringSchema.java     | 40 +++++++++++++++++++
 .../connectors/util/SimpleStringScheme.java     | 40 -------------------
 .../environment/StreamExecutionEnvironment.java | 10 ++++-
 .../function/source/GenericSourceFunction.java  | 25 ++++++++++++
 20 files changed, 282 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
new file mode 100644
index 0000000..1623943
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
+import 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public abstract class ConnectorSource<OUT> extends 
RichParallelSourceFunction<OUT> implements
+               GenericSourceFunction<OUT> {
+
+       private static final long serialVersionUID = 1L;
+       protected DeserializationSchema<OUT> schema;
+
+       public ConnectorSource(DeserializationSchema<OUT> schema) {
+               this.schema = schema;
+       }
+
+       @Override
+       public TypeInformation<OUT> getType() {
+               return 
TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
+                               null, null);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 3e68c45..8a2f2b8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -17,9 +17,10 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -38,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
        boolean initDone = false;
        String host;
        int port;
-       SerializationScheme<IN, byte[]> scheme;
+       SerializationSchema<IN, byte[]> scheme;
 
-       public FlumeSink(String host, int port, SerializationScheme<IN, byte[]> 
scheme) {
+       public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> 
schema) {
                this.host = host;
                this.port = port;
-               this.scheme = scheme;
+               this.scheme = schema;
        }
 
        /**
@@ -56,11 +57,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
        @Override
        public void invoke(IN value) {
 
-               if (!initDone) {
-                       client = new FlinkRpcClientFacade();
-                       client.init(host, port);
-               }
-
                byte[] data = scheme.serialize(value);
                client.sendDataToFlume(data);
 
@@ -136,4 +132,10 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
        public void close() {
                client.client.close();
        }
+
+       @Override
+       public void open(Configuration config) {
+               client = new FlinkRpcClientFacade();
+               client.init(host, port);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index ceb2be8..4f6ec2d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.connectors.flume;
 import java.util.List;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationScheme;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;
@@ -29,18 +29,17 @@ import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.Status;
 
-public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> {
+public class FlumeSource<OUT> extends ConnectorSource<OUT> {
        private static final long serialVersionUID = 1L;
 
        String host;
        String port;
-       DeserializationScheme<OUT> scheme;
        volatile boolean finished = false;
 
-       FlumeSource(String host, int port, DeserializationScheme<OUT> scheme) {
+       FlumeSource(String host, int port, DeserializationSchema<OUT> 
deserializationSchema) {
+               super(deserializationSchema);
                this.host = host;
                this.port = Integer.toString(port);
-               this.scheme = scheme;
        }
 
        public class MyAvroSource extends AvroSource {
@@ -87,9 +86,9 @@ public class FlumeSource<OUT> implements 
ParallelSourceFunction<OUT> {
                 */
                private void collect(AvroFlumeEvent avroEvent) {
                        byte[] b = avroEvent.getBody().array();
-                       OUT out = FlumeSource.this.scheme.deserialize(b);
+                       OUT out = FlumeSource.this.schema.deserialize(b);
 
-                       if (scheme.isEndOfStream(out)) {
+                       if (schema.isEndOfStream(out)) {
                                FlumeSource.this.finished = true;
                                this.stop();
                                FlumeSource.this.notifyAll();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 366b1a5..3cfd7d4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
-import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
 
 public class FlumeTopology {
 
@@ -30,13 +30,13 @@ public class FlumeTopology {
 
                @SuppressWarnings("unused")
                DataStream<String> dataStream1 = env.addSource(
-                               new FlumeSource<String>("localhost", 41414, new 
SimpleStringScheme())).addSink(
+                               new FlumeSource<String>("localhost", 41414, new 
SimpleStringSchema())).addSink(
                                new FlumeSink<String>("localhost", 42424, new 
StringToByteSerializer()));
 
                env.execute();
        }
 
-       public static class StringToByteSerializer implements 
SerializationScheme<String, byte[]> {
+       public static class StringToByteSerializer implements 
SerializationSchema<String, byte[]> {
 
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 71e25f2..9bb87a0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -24,7 +24,7 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
 
 public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
        private static final long serialVersionUID = 1L;
@@ -34,13 +34,13 @@ public class KafkaSink<IN, OUT> extends 
RichSinkFunction<IN> {
        private String topicId;
        private String brokerAddr;
        private boolean initDone = false;
-       private SerializationScheme<IN, OUT> scheme;
+       private SerializationSchema<IN, OUT> scheme;
 
        public KafkaSink(String topicId, String brokerAddr,
-                       SerializationScheme<IN, OUT> serializationScheme) {
+                       SerializationSchema<IN, OUT> serializationSchema) {
                this.topicId = topicId;
                this.brokerAddr = brokerAddr;
-               this.scheme = serializationScheme;
+               this.scheme = serializationSchema;
 
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 49e9144..7328500 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -29,27 +29,26 @@ import kafka.javaapi.consumer.ConsumerConnector;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationScheme;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
-public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> {
+public class KafkaSource<OUT> extends ConnectorSource<OUT> {
        private static final long serialVersionUID = 1L;
 
        private final String zkQuorum;
        private final String groupId;
        private final String topicId;
        private ConsumerConnector consumer;
-       private DeserializationScheme<OUT> scheme;
 
        OUT outTuple;
 
        public KafkaSource(String zkQuorum, String groupId, String topicId,
-                       DeserializationScheme<OUT> deserializationScheme) {
+                       DeserializationSchema<OUT> deserializationSchema) {
+               super(deserializationSchema);
                this.zkQuorum = zkQuorum;
                this.groupId = groupId;
                this.topicId = topicId;
-               this.scheme = deserializationScheme;
        }
 
        /**
@@ -81,8 +80,8 @@ public class KafkaSource<OUT> extends 
RichParallelSourceFunction<OUT> {
                ConsumerIterator<byte[], byte[]> it = stream.iterator();
 
                while (it.hasNext()) {
-                       OUT out = scheme.deserialize(it.next().message());
-                       if (scheme.isEndOfStream(out)) {
+                       OUT out = schema.deserialize(it.next().message());
+                       if (schema.isEndOfStream(out)) {
                                break;
                        }
                        collector.collect(out);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 16f123e..7801d56 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -19,15 +19,11 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KafkaTopology {
-       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTopology.class);
 
        public static final class MySource implements SourceFunction<String> {
                private static final long serialVersionUID = 1L;
@@ -42,17 +38,6 @@ public class KafkaTopology {
                }
        }
 
-       public static final class MyKafkaPrintSink implements 
SinkFunction<String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void invoke(String value) {
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info("String: <{}> arrived from Kafka", 
value);
-                       }
-               }
-       }
-
        public static void main(String[] args) throws Exception {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -61,11 +46,11 @@ public class KafkaTopology {
                DataStream<String> stream1 = env
                                .addSource(
                                                new 
KafkaSource<String>("localhost:2181", "group", "test",
-                                                               new 
SimpleStringScheme())).addSink(new MyKafkaPrintSink());
+                                                               new 
SimpleStringSchema())).print();
 
                @SuppressWarnings("unused")
                DataStream<String> stream2 = env.addSource(new 
MySource()).addSink(
-                               new KafkaSink<String, String>("test", 
"localhost:9092", new SimpleStringScheme()));
+                               new KafkaSink<String, String>("test", 
"localhost:9092", new SimpleStringSchema()));
 
                env.execute();
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index c4f4615..38c4f5f 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        private transient ConnectionFactory factory;
        private transient Connection connection;
        private transient Channel channel;
-       private SerializationScheme<IN, byte[]> scheme;
+       private SerializationSchema<IN, byte[]> scheme;
 
-       public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationScheme<IN, byte[]> scheme) {
+       public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationSchema<IN, byte[]> schema) {
                this.HOST_NAME = HOST_NAME;
                this.QUEUE_NAME = QUEUE_NAME;
-               this.scheme = scheme;
+               this.scheme = schema;
        }
 
        /**
@@ -56,6 +56,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                try {
                        connection = factory.newConnection();
                        channel = connection.createChannel();
+                       channel.queueDeclare(QUEUE_NAME, false, false, false, 
null);
 
                } catch (IOException e) {
                        throw new RuntimeException(e);
@@ -71,7 +72,6 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        @Override
        public void invoke(IN value) {
                try {
-                       channel.queueDeclare(QUEUE_NAME, false, false, false, 
null);
                        byte[] msg = scheme.serialize(value);
 
                        channel.basicPublish("", QUEUE_NAME, null, msg);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 240deab..7ce864e 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationScheme;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
-public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
+public class RMQSource<OUT> extends ConnectorSource<OUT> {
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RMQSource.class);
@@ -46,11 +46,11 @@ public class RMQSource<OUT> extends 
RichParallelSourceFunction<OUT> {
        private transient QueueingConsumer consumer;
        private transient QueueingConsumer.Delivery delivery;
 
-       private DeserializationScheme<OUT> scheme;
-
        OUT out;
 
-       public RMQSource(String HOST_NAME, String QUEUE_NAME, 
DeserializationScheme<OUT> scheme) {
+       public RMQSource(String HOST_NAME, String QUEUE_NAME,
+                       DeserializationSchema<OUT> deserializationSchema) {
+               super(deserializationSchema);
                this.HOST_NAME = HOST_NAME;
                this.QUEUE_NAME = QUEUE_NAME;
        }
@@ -92,8 +92,8 @@ public class RMQSource<OUT> extends 
RichParallelSourceFunction<OUT> {
                                }
                        }
 
-                       out = scheme.deserialize(delivery.getBody());
-                       if (scheme.isEndOfStream(out)) {
+                       out = schema.deserialize(delivery.getBody());
+                       if (schema.isEndOfStream(out)) {
                                break;
                        } else {
                                collector.collect(out);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 4f82b24..a6ca9ae 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
-import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
 
 public class RMQTopology {
 
@@ -30,7 +30,7 @@ public class RMQTopology {
 
                @SuppressWarnings("unused")
                DataStream<String> dataStream1 = env.addSource(
-                               new RMQSource<String>("localhost", "hello", new 
SimpleStringScheme())).print();
+                               new RMQSource<String>("localhost", "hello", new 
SimpleStringSchema())).print();
 
                @SuppressWarnings("unused")
                DataStream<String> dataStream2 = env.fromElements("one", "two", 
"three", "four", "five",
@@ -40,7 +40,7 @@ public class RMQTopology {
                env.execute();
        }
 
-       public static class StringToByteSerializer implements 
SerializationScheme<String, byte[]> {
+       public static class StringToByteSerializer implements 
SerializationSchema<String, byte[]> {
 
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
new file mode 100644
index 0000000..4507a1d
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+       /**
+        * Deserializes the incoming data.
+        * 
+        * @param message
+        *            The incoming message in a byte array
+        * @return The deserialized message in the required format.
+        */
+       public T deserialize(byte[] message);
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted
+        * 
+        * @param nextElement
+        *            The element to test for end signal
+        * @return The end signal, if true the stream shuts down
+        */
+       public boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
deleted file mode 100644
index 43420a3..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface DeserializationScheme<T> extends Serializable {
-
-       /**
-        * Deserializes the incoming data.
-        * 
-        * @param message
-        *            The incoming message in a byte array
-        * @return The deserialized message in the required format.
-        */
-       public T deserialize(byte[] message);
-
-       /**
-        * Method to decide whether the element signals the end of the stream. 
If
-        * true is returned the element won't be emitted
-        * 
-        * @param nextElement
-        *            The element to test for end signal
-        * @return The end signal, if true the stream shuts down
-        */
-       public boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
new file mode 100644
index 0000000..29c749a
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+               SerializationSchema<byte[], byte[]> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public byte[] deserialize(byte[] message) {
+               return message;
+       }
+
+       @Override
+       public boolean isEndOfStream(byte[] nextElement) {
+               return false;
+       }
+
+       @Override
+       public byte[] serialize(byte[] element) {
+               return element;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
deleted file mode 100644
index 34db00e..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class RawScheme implements DeserializationScheme<byte[]>,
-               SerializationScheme<byte[], byte[]> {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public byte[] deserialize(byte[] message) {
-               return message;
-       }
-
-       @Override
-       public boolean isEndOfStream(byte[] nextElement) {
-               return false;
-       }
-
-       @Override
-       public byte[] serialize(byte[] element) {
-               return element;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
new file mode 100644
index 0000000..7c32312
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T,R> extends Serializable {
+
+       /**
+        * Serializes the incoming element to a specified type.
+        * 
+        * @param element
+        *            The incoming element to be serialized
+        * @return The serialized element.
+        */
+       public R serialize(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
deleted file mode 100644
index bb25885..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface SerializationScheme<T,R> extends Serializable {
-
-       /**
-        * Serializes the incoming element to a specified type.
-        * 
-        * @param element
-        *            The incoming element to be serialized
-        * @return The serialized element.
-        */
-       public R serialize(T element);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
new file mode 100644
index 0000000..4b21580
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+               SerializationSchema<String, String> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public String deserialize(byte[] message) {
+               return new String(message);
+       }
+
+       @Override
+       public boolean isEndOfStream(String nextElement) {
+               return false;
+       }
+
+       @Override
+       public String serialize(String element) {
+               return element;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
deleted file mode 100644
index 9d1ce6c..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class SimpleStringScheme implements DeserializationScheme<String>,
-               SerializationScheme<String, String> {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public String deserialize(byte[] message) {
-               return new String(message);
-       }
-
-       @Override
-       public boolean isEndOfStream(String nextElement) {
-               return false;
-       }
-
-       @Override
-       public String serialize(String element) {
-               return element;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ecc2545..87f2287 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
 import 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
@@ -414,12 +415,17 @@ public abstract class StreamExecutionEnvironment {
         *            type of the returned stream
         * @return the data stream constructed
         */
+       @SuppressWarnings("unchecked")
        private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
function,
                        TypeInformation<OUT> outTypeInfo, String sourceName) {
 
                if (outTypeInfo == null) {
-                       outTypeInfo = 
TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(),
-                                       0, null, null);
+                       if (function instanceof GenericSourceFunction) {
+                               outTypeInfo = ((GenericSourceFunction<OUT>) 
function).getType();
+                       } else {
+                               outTypeInfo = 
TypeExtractor.createTypeInfo(SourceFunction.class,
+                                               function.getClass(), 0, null, 
null);
+                       }
                }
 
                boolean isParallel = function instanceof ParallelSourceFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
new file mode 100644
index 0000000..664d39a
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public interface GenericSourceFunction<T> {
+
+       public TypeInformation<T> getType();
+}

Reply via email to