[streaming] Updated connector type handling to suport generic classes by GenericSourceFunction interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3f1af0e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3f1af0e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3f1af0e3 Branch: refs/heads/master Commit: 3f1af0e38836b83cc50ab3ad895841138ca7f5d5 Parents: 2425885 Author: Gyula Fora <[email protected]> Authored: Thu Dec 25 18:00:26 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Sat Jan 3 21:44:42 2015 +0100 ---------------------------------------------------------------------- .../streaming/connectors/ConnectorSource.java | 42 ++++++++++++++++++++ .../streaming/connectors/flume/FlumeSink.java | 20 +++++----- .../streaming/connectors/flume/FlumeSource.java | 15 ++++--- .../connectors/flume/FlumeTopology.java | 8 ++-- .../streaming/connectors/kafka/KafkaSink.java | 8 ++-- .../streaming/connectors/kafka/KafkaSource.java | 15 ++++--- .../connectors/kafka/KafkaTopology.java | 21 ++-------- .../streaming/connectors/rabbitmq/RMQSink.java | 10 ++--- .../connectors/rabbitmq/RMQSource.java | 16 ++++---- .../connectors/rabbitmq/RMQTopology.java | 8 ++-- .../connectors/util/DeserializationSchema.java | 42 ++++++++++++++++++++ .../connectors/util/DeserializationScheme.java | 42 -------------------- .../streaming/connectors/util/RawSchema.java | 39 ++++++++++++++++++ .../streaming/connectors/util/RawScheme.java | 39 ------------------ .../connectors/util/SerializationSchema.java | 33 +++++++++++++++ .../connectors/util/SerializationScheme.java | 33 --------------- .../connectors/util/SimpleStringSchema.java | 40 +++++++++++++++++++ .../connectors/util/SimpleStringScheme.java | 40 ------------------- .../environment/StreamExecutionEnvironment.java | 10 ++++- .../function/source/GenericSourceFunction.java | 25 ++++++++++++ 20 files changed, 282 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java new file mode 100644 index 0000000..1623943 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.function.source.GenericSourceFunction; +import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; + +public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements + GenericSourceFunction<OUT> { + + private static final long serialVersionUID = 1L; + protected DeserializationSchema<OUT> schema; + + public ConnectorSource(DeserializationSchema<OUT> schema) { + this.schema = schema; + } + + @Override + public TypeInformation<OUT> getType() { + return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0, + null, null); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java index 3e68c45..8a2f2b8 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -17,9 +17,10 @@ package org.apache.flink.streaming.connectors.flume; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.function.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.util.SerializationScheme; +import org.apache.flink.streaming.connectors.util.SerializationSchema; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; @@ -38,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> { boolean initDone = false; String host; int port; - SerializationScheme<IN, byte[]> scheme; + SerializationSchema<IN, byte[]> scheme; - public FlumeSink(String host, int port, SerializationScheme<IN, byte[]> scheme) { + public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) { this.host = host; this.port = port; - this.scheme = scheme; + this.scheme = schema; } /** @@ -56,11 +57,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> { @Override public void invoke(IN value) { - if (!initDone) { - client = new FlinkRpcClientFacade(); - client.init(host, port); - } - byte[] data = scheme.serialize(value); client.sendDataToFlume(data); @@ -136,4 +132,10 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> { public void close() { client.client.close(); } + + @Override + public void open(Configuration config) { + client = new FlinkRpcClientFacade(); + client.init(host, port); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java index ceb2be8..4f6ec2d 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java @@ -20,8 +20,8 @@ package org.apache.flink.streaming.connectors.flume; import java.util.List; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.function.source.ParallelSourceFunction; -import org.apache.flink.streaming.connectors.util.DeserializationScheme; +import org.apache.flink.streaming.connectors.ConnectorSource; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; import org.apache.flume.Context; import org.apache.flume.channel.ChannelProcessor; @@ -29,18 +29,17 @@ import org.apache.flume.source.AvroSource; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.flume.source.avro.Status; -public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> { +public class FlumeSource<OUT> extends ConnectorSource<OUT> { private static final long serialVersionUID = 1L; String host; String port; - DeserializationScheme<OUT> scheme; volatile boolean finished = false; - FlumeSource(String host, int port, DeserializationScheme<OUT> scheme) { + FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) { + super(deserializationSchema); this.host = host; this.port = Integer.toString(port); - this.scheme = scheme; } public class MyAvroSource extends AvroSource { @@ -87,9 +86,9 @@ public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> { */ private void collect(AvroFlumeEvent avroEvent) { byte[] b = avroEvent.getBody().array(); - OUT out = FlumeSource.this.scheme.deserialize(b); + OUT out = FlumeSource.this.schema.deserialize(b); - if (scheme.isEndOfStream(out)) { + if (schema.isEndOfStream(out)) { FlumeSource.this.finished = true; this.stop(); FlumeSource.this.notifyAll(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java index 366b1a5..3cfd7d4 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.util.SerializationScheme; -import org.apache.flink.streaming.connectors.util.SimpleStringScheme; +import org.apache.flink.streaming.connectors.util.SerializationSchema; +import org.apache.flink.streaming.connectors.util.SimpleStringSchema; public class FlumeTopology { @@ -30,13 +30,13 @@ public class FlumeTopology { @SuppressWarnings("unused") DataStream<String> dataStream1 = env.addSource( - new FlumeSource<String>("localhost", 41414, new SimpleStringScheme())).addSink( + new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink( new FlumeSink<String>("localhost", 42424, new StringToByteSerializer())); env.execute(); } - public static class StringToByteSerializer implements SerializationScheme<String, byte[]> { + public static class StringToByteSerializer implements SerializationSchema<String, byte[]> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java index 71e25f2..9bb87a0 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java @@ -24,7 +24,7 @@ import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.flink.streaming.api.function.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.util.SerializationScheme; +import org.apache.flink.streaming.connectors.util.SerializationSchema; public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; @@ -34,13 +34,13 @@ public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> { private String topicId; private String brokerAddr; private boolean initDone = false; - private SerializationScheme<IN, OUT> scheme; + private SerializationSchema<IN, OUT> scheme; public KafkaSink(String topicId, String brokerAddr, - SerializationScheme<IN, OUT> serializationScheme) { + SerializationSchema<IN, OUT> serializationSchema) { this.topicId = topicId; this.brokerAddr = brokerAddr; - this.scheme = serializationScheme; + this.scheme = serializationSchema; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java index 49e9144..7328500 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java @@ -29,27 +29,26 @@ import kafka.javaapi.consumer.ConsumerConnector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction; -import org.apache.flink.streaming.connectors.util.DeserializationScheme; +import org.apache.flink.streaming.connectors.ConnectorSource; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; -public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> { +public class KafkaSource<OUT> extends ConnectorSource<OUT> { private static final long serialVersionUID = 1L; private final String zkQuorum; private final String groupId; private final String topicId; private ConsumerConnector consumer; - private DeserializationScheme<OUT> scheme; OUT outTuple; public KafkaSource(String zkQuorum, String groupId, String topicId, - DeserializationScheme<OUT> deserializationScheme) { + DeserializationSchema<OUT> deserializationSchema) { + super(deserializationSchema); this.zkQuorum = zkQuorum; this.groupId = groupId; this.topicId = topicId; - this.scheme = deserializationScheme; } /** @@ -81,8 +80,8 @@ public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { - OUT out = scheme.deserialize(it.next().message()); - if (scheme.isEndOfStream(out)) { + OUT out = schema.deserialize(it.next().message()); + if (schema.isEndOfStream(out)) { break; } collector.collect(out); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java index 16f123e..7801d56 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java @@ -19,15 +19,11 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.sink.SinkFunction; import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.connectors.util.SimpleStringScheme; +import org.apache.flink.streaming.connectors.util.SimpleStringSchema; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KafkaTopology { - private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class); public static final class MySource implements SourceFunction<String> { private static final long serialVersionUID = 1L; @@ -42,17 +38,6 @@ public class KafkaTopology { } } - public static final class MyKafkaPrintSink implements SinkFunction<String> { - private static final long serialVersionUID = 1L; - - @Override - public void invoke(String value) { - if (LOG.isInfoEnabled()) { - LOG.info("String: <{}> arrived from Kafka", value); - } - } - } - public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -61,11 +46,11 @@ public class KafkaTopology { DataStream<String> stream1 = env .addSource( new KafkaSource<String>("localhost:2181", "group", "test", - new SimpleStringScheme())).addSink(new MyKafkaPrintSink()); + new SimpleStringSchema())).print(); @SuppressWarnings("unused") DataStream<String> stream2 = env.addSource(new MySource()).addSink( - new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringScheme())); + new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema())); env.execute(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index c4f4615..38c4f5f 100644 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -21,7 +21,7 @@ import java.io.IOException; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.function.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.util.SerializationScheme; +import org.apache.flink.streaming.connectors.util.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { private transient ConnectionFactory factory; private transient Connection connection; private transient Channel channel; - private SerializationScheme<IN, byte[]> scheme; + private SerializationSchema<IN, byte[]> scheme; - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationScheme<IN, byte[]> scheme) { + public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) { this.HOST_NAME = HOST_NAME; this.QUEUE_NAME = QUEUE_NAME; - this.scheme = scheme; + this.scheme = schema; } /** @@ -56,6 +56,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { try { connection = factory.newConnection(); channel = connection.createChannel(); + channel.queueDeclare(QUEUE_NAME, false, false, false, null); } catch (IOException e) { throw new RuntimeException(e); @@ -71,7 +72,6 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { @Override public void invoke(IN value) { try { - channel.queueDeclare(QUEUE_NAME, false, false, false, null); byte[] msg = scheme.serialize(value); channel.basicPublish("", QUEUE_NAME, null, msg); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 240deab..7ce864e 100755 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -21,8 +21,8 @@ import java.io.IOException; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction; -import org.apache.flink.streaming.connectors.util.DeserializationScheme; +import org.apache.flink.streaming.connectors.ConnectorSource; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; -public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> { +public class RMQSource<OUT> extends ConnectorSource<OUT> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class); @@ -46,11 +46,11 @@ public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> { private transient QueueingConsumer consumer; private transient QueueingConsumer.Delivery delivery; - private DeserializationScheme<OUT> scheme; - OUT out; - public RMQSource(String HOST_NAME, String QUEUE_NAME, DeserializationScheme<OUT> scheme) { + public RMQSource(String HOST_NAME, String QUEUE_NAME, + DeserializationSchema<OUT> deserializationSchema) { + super(deserializationSchema); this.HOST_NAME = HOST_NAME; this.QUEUE_NAME = QUEUE_NAME; } @@ -92,8 +92,8 @@ public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> { } } - out = scheme.deserialize(delivery.getBody()); - if (scheme.isEndOfStream(out)) { + out = schema.deserialize(delivery.getBody()); + if (schema.isEndOfStream(out)) { break; } else { collector.collect(out); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java index 4f82b24..a6ca9ae 100755 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.util.SerializationScheme; -import org.apache.flink.streaming.connectors.util.SimpleStringScheme; +import org.apache.flink.streaming.connectors.util.SerializationSchema; +import org.apache.flink.streaming.connectors.util.SimpleStringSchema; public class RMQTopology { @@ -30,7 +30,7 @@ public class RMQTopology { @SuppressWarnings("unused") DataStream<String> dataStream1 = env.addSource( - new RMQSource<String>("localhost", "hello", new SimpleStringScheme())).print(); + new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print(); @SuppressWarnings("unused") DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five", @@ -40,7 +40,7 @@ public class RMQTopology { env.execute(); } - public static class StringToByteSerializer implements SerializationScheme<String, byte[]> { + public static class StringToByteSerializer implements SerializationSchema<String, byte[]> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java new file mode 100644 index 0000000..4507a1d --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.util; + +import java.io.Serializable; + +public interface DeserializationSchema<T> extends Serializable { + + /** + * Deserializes the incoming data. + * + * @param message + * The incoming message in a byte array + * @return The deserialized message in the required format. + */ + public T deserialize(byte[] message); + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted + * + * @param nextElement + * The element to test for end signal + * @return The end signal, if true the stream shuts down + */ + public boolean isEndOfStream(T nextElement); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java deleted file mode 100644 index 43420a3..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -import java.io.Serializable; - -public interface DeserializationScheme<T> extends Serializable { - - /** - * Deserializes the incoming data. - * - * @param message - * The incoming message in a byte array - * @return The deserialized message in the required format. - */ - public T deserialize(byte[] message); - - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted - * - * @param nextElement - * The element to test for end signal - * @return The end signal, if true the stream shuts down - */ - public boolean isEndOfStream(T nextElement); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java new file mode 100644 index 0000000..29c749a --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.util; + +public class RawSchema implements DeserializationSchema<byte[]>, + SerializationSchema<byte[], byte[]> { + + private static final long serialVersionUID = 1L; + + @Override + public byte[] deserialize(byte[] message) { + return message; + } + + @Override + public boolean isEndOfStream(byte[] nextElement) { + return false; + } + + @Override + public byte[] serialize(byte[] element) { + return element; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java deleted file mode 100644 index 34db00e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -public class RawScheme implements DeserializationScheme<byte[]>, - SerializationScheme<byte[], byte[]> { - - private static final long serialVersionUID = 1L; - - @Override - public byte[] deserialize(byte[] message) { - return message; - } - - @Override - public boolean isEndOfStream(byte[] nextElement) { - return false; - } - - @Override - public byte[] serialize(byte[] element) { - return element; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java new file mode 100644 index 0000000..7c32312 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.util; + +import java.io.Serializable; + +public interface SerializationSchema<T,R> extends Serializable { + + /** + * Serializes the incoming element to a specified type. + * + * @param element + * The incoming element to be serialized + * @return The serialized element. + */ + public R serialize(T element); + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java deleted file mode 100644 index bb25885..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -import java.io.Serializable; - -public interface SerializationScheme<T,R> extends Serializable { - - /** - * Serializes the incoming element to a specified type. - * - * @param element - * The incoming element to be serialized - * @return The serialized element. - */ - public R serialize(T element); - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java new file mode 100644 index 0000000..4b21580 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.util; + +public class SimpleStringSchema implements DeserializationSchema<String>, + SerializationSchema<String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public String deserialize(byte[] message) { + return new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public String serialize(String element) { + return element; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java deleted file mode 100644 index 9d1ce6c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -public class SimpleStringScheme implements DeserializationScheme<String>, - SerializationScheme<String, String> { - - private static final long serialVersionUID = 1L; - - @Override - public String deserialize(byte[] message) { - return new String(message); - } - - @Override - public boolean isEndOfStream(String nextElement) { - return false; - } - - @Override - public String serialize(String element) { - return element; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index ecc2545..87f2287 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.function.source.FileSourceFunction; import org.apache.flink.streaming.api.function.source.FileStreamFunction; import org.apache.flink.streaming.api.function.source.FromElementsFunction; import org.apache.flink.streaming.api.function.source.GenSequenceFunction; +import org.apache.flink.streaming.api.function.source.GenericSourceFunction; import org.apache.flink.streaming.api.function.source.ParallelSourceFunction; import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction; @@ -414,12 +415,17 @@ public abstract class StreamExecutionEnvironment { * type of the returned stream * @return the data stream constructed */ + @SuppressWarnings("unchecked") private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> outTypeInfo, String sourceName) { if (outTypeInfo == null) { - outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), - 0, null, null); + if (function instanceof GenericSourceFunction) { + outTypeInfo = ((GenericSourceFunction<OUT>) function).getType(); + } else { + outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, + function.getClass(), 0, null, null); + } } boolean isParallel = function instanceof ParallelSourceFunction; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f1af0e3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java new file mode 100644 index 0000000..664d39a --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public interface GenericSourceFunction<T> { + + public TypeInformation<T> getType(); +}
