http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 30f6dc2..de72985 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.ConfigConstants;
@@ -32,7 +33,6 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index dcf3167..3138152 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 218401c..7a882f4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 21171f8..6851474 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,17 +17,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
 import kafka.server.KafkaServer;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index b204ea9..e432a65 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.connectors.kafka.testutils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -36,8 +38,6 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import java.util.Collection;
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index b3c6f8c..c1118ed 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 5018bcf..d454153 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
@@ -25,7 +26,6 @@ import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourc
 import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Preconditions;
 
 import com.rabbitmq.client.Channel;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
index 93f884b..53b834d 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 0996355..bbf893f 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -32,7 +33,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 5b97273..ae3f56e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -107,6 +107,9 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Joda and jackson are used to test that serialization and 
type extraction
+                       work with types from those libraries -->
+
                <dependency>
                        <groupId>joda-time</groupId>
                        <artifactId>joda-time</artifactId>
@@ -118,6 +121,13 @@ under the License.
                        <artifactId>joda-convert</artifactId>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-jackson</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
     </dependencies>
 
        <profiles>
@@ -208,6 +218,11 @@ under the License.
                                                        
<exclude>org.apache.flink.configuration.ConfigConstants#ENABLE_QUARANTINE_MONITOR</exclude>
                                                        
<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY</exclude>
                                                        
<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY</exclude>
+
+                                                       <!-- apparently there 
is a bug in the plugin which makes it fail on this new file, event though
+                                                               its new, and 
not conflicting/breaking -->
+                                                       
<exclude>org.apache.flink.api.common.serialization.DeserializationSchema</exclude>
+                                                       
<exclude>org.apache.flink.api.common.serialization.SerializationSchema</exclude>
                                                </excludes>
                                        </parameter>
                                </configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
new file mode 100644
index 0000000..871b7b1
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema describes how to turn the byte messages 
delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
+ * processed by Flink.
+ *
+ * <p>This base variant of the deserialization schema produces the type 
information
+ * automatically by extracting it from the generic class arguments.
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+@PublicEvolving
+public abstract class AbstractDeserializationSchema<T> implements 
DeserializationSchema<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * De-serializes the byte message.
+        *
+        * @param message The message, as a byte array.
+        * @return The de-serialized message as an object.
+        */
+       @Override
+       public abstract T deserialize(byte[] message) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * <p>This default implementation returns always false, meaning the 
stream is interpreted
+        * to be unbounded.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       @Override
+       public boolean isEndOfStream(T nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return 
TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 
0, null, null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
new file mode 100644
index 0000000..9de4743
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte messages 
delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
+ * processed by Flink.
+ *
+ * <p>Note: In most cases, one should start from {@link 
AbstractDeserializationSchema}, which
+ * takes care of producing the return type information automatically.
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+@Public
+public interface DeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Deserializes the byte message.
+        *
+        * @param message The message, as a byte array.
+        *
+        * @return The deserialized message as an object (null if the message 
cannot be deserialized).
+        */
+       T deserialize(byte[] message) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
new file mode 100644
index 0000000..3a4eaeb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a 
different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data 
to be handed
+ * to them in a specific format (for example as byte strings).
+ *
+ * @param <T> The type to be serialized.
+ */
+@Public
+public interface SerializationSchema<T> extends Serializable {
+
+       /**
+        * Serializes the incoming element to a specified type.
+        *
+        * @param element
+        *            The incoming element to be serialized
+        * @return The serialized element.
+        */
+       byte[] serialize(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java
new file mode 100644
index 0000000..3130a10
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Very simple serialization schema for strings.
+ *
+ * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
+ */
+@PublicEvolving
+public class SimpleStringSchema implements DeserializationSchema<String>, 
SerializationSchema<String> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The charset to use to convert between strings and bytes.
+        * The field is transient because we serialize a different delegate 
object instead */
+       private transient Charset charset;
+
+       /**
+        * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
+        */
+       public SimpleStringSchema() {
+               this(StandardCharsets.UTF_8);
+       }
+
+       /**
+        * Creates a new SimpleStringSchema that uses the given charset to 
convert between strings and bytes.
+        *
+        * @param charset The charset to use to convert between strings and 
bytes.
+        */
+       public SimpleStringSchema(Charset charset) {
+               this.charset = checkNotNull(charset);
+       }
+
+       /**
+        * Gets the charset used by this schema for serialization.
+        * @return The charset used by this schema for serialization.
+        */
+       public Charset getCharset() {
+               return charset;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Kafka Serialization
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String deserialize(byte[] message) {
+               return new String(message, charset);
+       }
+
+       @Override
+       public boolean isEndOfStream(String nextElement) {
+               return false;
+       }
+
+       @Override
+       public byte[] serialize(String element) {
+               return element.getBytes(charset);
+       }
+
+       @Override
+       public TypeInformation<String> getProducedType() {
+               return BasicTypeInfo.STRING_TYPE_INFO;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Java Serialization
+       // 
------------------------------------------------------------------------
+
+       private void writeObject (ObjectOutputStream out) throws IOException {
+               out.defaultWriteObject();
+               out.writeUTF(charset.name());
+       }
+
+       private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
+               in.defaultReadObject();
+               String charsetName = in.readUTF();
+               this.charset = Charset.forName(charsetName);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
new file mode 100644
index 0000000..217a889
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema that uses Flink's serialization 
stack to
+ * transform typed from and to byte arrays.
+ *
+ * @param <T> The type to be serialized.
+ */
+@Public
+public class TypeInformationSerializationSchema<T> implements 
DeserializationSchema<T>, SerializationSchema<T> {
+
+       private static final long serialVersionUID = -5359448468131559102L;
+
+       /** The serializer for the actual de-/serialization. */
+       private final TypeSerializer<T> serializer;
+
+       /** The reusable output serialization buffer. */
+       private transient DataOutputSerializer dos;
+
+       /** The reusable input deserialization buffer. */
+       private transient DataInputDeserializer dis;
+
+       /**
+        * The type information, to be returned by {@link #getProducedType()}. 
It is transient, because
+        * it is not serializable. Note that this means that the type 
information is not available at
+        * runtime, but only prior to the first serialization / deserialization.
+        */
+       private transient TypeInformation<T> typeInfo;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new de-/serialization schema for the given type.
+        *
+        * @param typeInfo The type information for the type de-/serialized by 
this schema.
+        * @param ec The execution config, which is used to parametrize the 
type serializers.
+        */
+       public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, 
ExecutionConfig ec) {
+               this.typeInfo = typeInfo;
+               this.serializer = typeInfo.createSerializer(ec);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public T deserialize(byte[] message) {
+               if (dis != null) {
+                       dis.setBuffer(message, 0, message.length);
+               } else {
+                       dis = new DataInputDeserializer(message, 0, 
message.length);
+               }
+
+               try {
+                       return serializer.deserialize(dis);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Unable to deserialize 
message", e);
+               }
+       }
+
+       /**
+        * This schema never considers an element to signal end-of-stream, so 
this method returns always false.
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return Returns false.
+        */
+       @Override
+       public boolean isEndOfStream(T nextElement) {
+               return false;
+       }
+
+       @Override
+       public byte[] serialize(T element) {
+               if (dos == null) {
+                       dos = new DataOutputSerializer(16);
+               }
+
+               try {
+                       serializer.serialize(element, dos);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Unable to serialize 
record", e);
+               }
+
+               byte[] ret = dos.getByteArray();
+               if (ret.length != dos.length()) {
+                       byte[] n = new byte[dos.length()];
+                       System.arraycopy(ret, 0, n, 0, dos.length());
+                       ret = n;
+               }
+               dos.clear();
+               return ret;
+       }
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               if (typeInfo != null) {
+                       return typeInfo;
+               }
+               else {
+                       throw new IllegalStateException(
+                                       "The type information is not available 
after this class has been serialized and distributed.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
new file mode 100644
index 0000000..ec241b4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link AbstractDeserializationSchema}.
+ */
+@SuppressWarnings("serial")
+public class AbstractDeserializationSchemaTest {
+
+       @Test
+       public void testTypeExtractionTuple() {
+               TypeInformation<Tuple2<byte[], byte[]>> type = new 
TupleSchema().getProducedType();
+               TypeInformation<Tuple2<byte[], byte[]>> expected = 
TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionTupleAnonymous() {
+               TypeInformation<Tuple2<byte[], byte[]>> type = new 
AbstractDeserializationSchema<Tuple2<byte[], byte[]>>() {
+                       @Override
+                       public Tuple2<byte[], byte[]> deserialize(byte[] 
message) throws IOException {
+                               throw new UnsupportedOperationException();
+                       }
+               }.getProducedType();
+
+               TypeInformation<Tuple2<byte[], byte[]>> expected = 
TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionGeneric() {
+               TypeInformation<JSONPObject> type = new 
JsonSchema().getProducedType();
+               TypeInformation<JSONPObject> expected = TypeInformation.of(new 
TypeHint<JSONPObject>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionGenericAnonymous() {
+               TypeInformation<JSONPObject> type = new 
AbstractDeserializationSchema<JSONPObject>() {
+                       @Override
+                       public JSONPObject deserialize(byte[] message) throws 
IOException {
+                               throw new UnsupportedOperationException();
+                       }
+               }.getProducedType();
+
+               TypeInformation<JSONPObject> expected = TypeInformation.of(new 
TypeHint<JSONPObject>(){});
+               assertEquals(expected, type);
+       }
+
+       @Test
+       public void testTypeExtractionRawException() {
+               try {
+                       new RawSchema().getProducedType();
+                       fail();
+               } catch (InvalidTypesException e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test types
+       // 
------------------------------------------------------------------------
+
+       private static class TupleSchema extends 
AbstractDeserializationSchema<Tuple2<byte[], byte[]>> {
+
+               @Override
+               public Tuple2<byte[], byte[]> deserialize(byte[] message) 
throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       private static class JsonSchema extends 
AbstractDeserializationSchema<JSONPObject> {
+
+               @Override
+               public JSONPObject deserialize(byte[] message) throws 
IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       @SuppressWarnings("rawtypes")
+       private static class RawSchema extends AbstractDeserializationSchema {
+
+               @Override
+               public Object deserialize(byte[] message) throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
new file mode 100644
index 0000000..482ff13
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link SimpleStringSchema}.
+ */
+public class SimpleStringSchemaTest {
+
+       @Test
+       public void testSerializationWithAnotherCharset() {
+               final Charset charset = StandardCharsets.UTF_16BE;
+               final String string = "之掃描古籍版實乃姚鼐的";
+               final byte[] bytes = string.getBytes(charset);
+
+               assertArrayEquals(bytes, new 
SimpleStringSchema(charset).serialize(string));
+               assertEquals(string, new 
SimpleStringSchema(charset).deserialize(bytes));
+       }
+
+       @Test
+       public void testSerializability() throws Exception {
+               final SimpleStringSchema schema = new 
SimpleStringSchema(StandardCharsets.UTF_16LE);
+               final SimpleStringSchema copy = 
CommonTestUtils.createCopySerializable(schema);
+
+               assertEquals(schema.getCharset(), copy.getCharset());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
new file mode 100644
index 0000000..ef5f4b0
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link TypeInformationSerializationSchema}.
+ */
+public class TypeInformationSerializationSchemaTest {
+
+       @Test
+       public void testDeSerialization() {
+               try {
+                       TypeInformation<MyPOJO> info = 
TypeExtractor.getForClass(MyPOJO.class);
+
+                       TypeInformationSerializationSchema<MyPOJO> schema =
+                                       new 
TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
+
+                       MyPOJO[] types = {
+                                       new MyPOJO(72, new Date(763784523L), 
new Date(88234L)),
+                                       new MyPOJO(-1, new 
Date(11111111111111L)),
+                                       new MyPOJO(42),
+                                       new MyPOJO(17, new Date(222763784523L))
+                       };
+
+                       for (MyPOJO val : types) {
+                               byte[] serialized = schema.serialize(val);
+                               MyPOJO deser = schema.deserialize(serialized);
+                               assertEquals(val, deser);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSerializability() {
+               try {
+                       TypeInformation<MyPOJO> info = 
TypeExtractor.getForClass(MyPOJO.class);
+                       TypeInformationSerializationSchema<MyPOJO> schema =
+                                       new 
TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
+
+                       // this needs to succeed
+                       CommonTestUtils.createCopySerializable(schema);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test data types
+       // 
------------------------------------------------------------------------
+
+       private static class MyPOJO {
+
+               public int aField;
+               public List<Date> aList;
+
+               public MyPOJO() {}
+
+               public MyPOJO(int iVal, Date... dates) {
+                       this.aField = iVal;
+                       this.aList = new ArrayList<>(Arrays.asList(dates));
+               }
+
+               @Override
+               public int hashCode() {
+                       return aField;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof MyPOJO) {
+                               MyPOJO that = (MyPOJO) obj;
+                               return this.aField == that.aField && 
(this.aList == null ?
+                                               that.aList == null :
+                                               that.aList != null && 
this.aList.equals(that.aList));
+                       }
+                       return super.equals(obj);
+               }
+
+               @Override
+               public String toString() {
+                       return "MyPOJO " + aField + " " + aList;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
index b5abbc5..3fbd2b4 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -19,12 +19,12 @@ package org.apache.flink.streaming.examples.kafka;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
index 2a52811..9f4fdc4 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.streaming.scala.examples.kafka
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.common.serialization.SimpleStringSchema
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 
 /**
  * An example that shows how to read from and write to Kafka. This will read 
String messages

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 5b15b5a..2683546 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -77,12 +77,6 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-jackson</artifactId>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils-junit</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d0769c6..2274968 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -91,7 +92,6 @@ import 
org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 214d5c2..80e0dbe 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.SerializableObject;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
index 02ea004..7d30f91 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.io.IOException;
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * The deserialization schema describes how to turn the byte messages 
delivered by certain
@@ -32,37 +29,15 @@ import java.io.IOException;
  * automatically by extracting it from the generic class arguments.
  *
  * @param <T> The type created by the deserialization schema.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema} 
instead.
  */
-public abstract class AbstractDeserializationSchema<T> implements 
DeserializationSchema<T> {
+@Deprecated
+@PublicEvolving
+@SuppressWarnings("deprecation")
+public abstract class AbstractDeserializationSchema<T>
+               extends 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema<T>
+               implements DeserializationSchema<T> {
 
        private static final long serialVersionUID = 1L;
-
-       /**
-        * De-serializes the byte message.
-        *
-        * @param message The message, as a byte array.
-        * @return The de-serialized message as an object.
-        */
-       @Override
-       public abstract T deserialize(byte[] message) throws IOException;
-
-       /**
-        * Method to decide whether the element signals the end of the stream. 
If
-        * true is returned the element won't be emitted.
-        *
-        * <p>This default implementation returns always false, meaning the 
stream is interpreted
-        * to be unbounded.
-        *
-        * @param nextElement The element to test for the end-of-stream signal.
-        * @return True, if the element signals end of stream, false otherwise.
-        */
-       @Override
-       public boolean isEndOfStream(T nextElement) {
-               return false;
-       }
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return 
TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 
0, null, null);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 15ecb2c..cbaa004 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -32,9 +32,15 @@ import java.io.Serializable;
  * takes care of producing the return type information automatically.
  *
  * @param <T> The type created by the deserialization schema.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.common.serialization.DeserializationSchema} instead.
  */
 @Public
-public interface DeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+@Deprecated
+public interface DeserializationSchema<T> extends
+               
org.apache.flink.api.common.serialization.DeserializationSchema<T>,
+               Serializable,
+               ResultTypeQueryable<T> {
 
        /**
         * Deserializes the byte message.
@@ -43,6 +49,7 @@ public interface DeserializationSchema<T> extends 
Serializable, ResultTypeQuerya
         *
         * @return The deserialized message as an object (null if the message 
cannot be deserialized).
         */
+       @Override
        T deserialize(byte[] message) throws IOException;
 
        /**
@@ -52,5 +59,6 @@ public interface DeserializationSchema<T> extends 
Serializable, ResultTypeQuerya
         * @param nextElement The element to test for the end-of-stream signal.
         * @return True, if the element signals end of stream, false otherwise.
         */
+       @Override
        boolean isEndOfStream(T nextElement);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
index 986cfb3..c7c1de0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -27,9 +27,13 @@ import java.io.Serializable;
  * to them in a specific format (for example as byte strings).
  *
  * @param <T> The type to be serialized.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.common.serialization.SerializationSchema} instead.
  */
 @Public
-public interface SerializationSchema<T> extends Serializable {
+@Deprecated
+public interface SerializationSchema<T>
+               extends 
org.apache.flink.api.common.serialization.SerializationSchema<T>, Serializable {
 
        /**
         * Serializes the incoming element to a specified type.
@@ -38,5 +42,6 @@ public interface SerializationSchema<T> extends Serializable {
         *            The incoming element to be serialized
         * @return The serialized element.
         */
+       @Override
        byte[] serialize(T element);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 27ba9e9..01ce30d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -18,35 +18,27 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Very simple serialization schema for strings.
  *
  * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.common.serialization.SimpleStringSchema} instead.
  */
 @PublicEvolving
-public class SimpleStringSchema implements DeserializationSchema<String>, 
SerializationSchema<String> {
+@Deprecated
+@SuppressWarnings("deprecation")
+public class SimpleStringSchema
+               extends 
org.apache.flink.api.common.serialization.SimpleStringSchema
+               implements SerializationSchema<String>, 
DeserializationSchema<String> {
 
        private static final long serialVersionUID = 1L;
 
-       /** The charset to use to convert between strings and bytes.
-        * The field is transient because we serialize a different delegate 
object instead */
-       private transient Charset charset;
-
-       /**
-        * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
-        */
        public SimpleStringSchema() {
-               this(StandardCharsets.UTF_8);
+               super();
        }
 
        /**
@@ -55,53 +47,6 @@ public class SimpleStringSchema implements 
DeserializationSchema<String>, Serial
         * @param charset The charset to use to convert between strings and 
bytes.
         */
        public SimpleStringSchema(Charset charset) {
-               this.charset = checkNotNull(charset);
-       }
-
-       /**
-        * Gets the charset used by this schema for serialization.
-        * @return The charset used by this schema for serialization.
-        */
-       public Charset getCharset() {
-               return charset;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Kafka Serialization
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String deserialize(byte[] message) {
-               return new String(message, charset);
-       }
-
-       @Override
-       public boolean isEndOfStream(String nextElement) {
-               return false;
-       }
-
-       @Override
-       public byte[] serialize(String element) {
-               return element.getBytes(charset);
-       }
-
-       @Override
-       public TypeInformation<String> getProducedType() {
-               return BasicTypeInfo.STRING_TYPE_INFO;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Java Serialization
-       // 
------------------------------------------------------------------------
-
-       private void writeObject (ObjectOutputStream out) throws IOException {
-               out.defaultWriteObject();
-               out.writeUTF(charset.name());
-       }
-
-       private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               String charsetName = in.readUTF();
-               this.charset = Charset.forName(charsetName);
+               super(charset);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 1c50dc2..b771fe0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -21,41 +21,24 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-
-import java.io.IOException;
 
 /**
  * A serialization and deserialization schema that uses Flink's serialization 
stack to
  * transform typed from and to byte arrays.
  *
  * @param <T> The type to be serialized.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema} 
instead.
  */
 @Public
-public class TypeInformationSerializationSchema<T> implements 
DeserializationSchema<T>, SerializationSchema<T> {
+@Deprecated
+@SuppressWarnings("deprecation")
+public class TypeInformationSerializationSchema<T>
+               extends 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema<T>
+               implements DeserializationSchema<T>, SerializationSchema<T> {
 
        private static final long serialVersionUID = -5359448468131559102L;
 
-       /** The serializer for the actual de-/serialization. */
-       private final TypeSerializer<T> serializer;
-
-       /** The reusable output serialization buffer. */
-       private transient DataOutputSerializer dos;
-
-       /** The reusable input deserialization buffer. */
-       private transient DataInputDeserializer dis;
-
-       /**
-        * The type information, to be returned by {@link #getProducedType()}. 
It is transient, because
-        * it is not serializable. Note that this means that the type 
information is not available at
-        * runtime, but only prior to the first serialization / deserialization.
-        */
-       private transient TypeInformation<T> typeInfo;
-
-       // 
------------------------------------------------------------------------
-
        /**
         * Creates a new de-/serialization schema for the given type.
         *
@@ -63,69 +46,6 @@ public class TypeInformationSerializationSchema<T> 
implements DeserializationSch
         * @param ec The execution config, which is used to parametrize the 
type serializers.
         */
        public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, 
ExecutionConfig ec) {
-               this.typeInfo = typeInfo;
-               this.serializer = typeInfo.createSerializer(ec);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public T deserialize(byte[] message) {
-               if (dis != null) {
-                       dis.setBuffer(message, 0, message.length);
-               } else {
-                       dis = new DataInputDeserializer(message, 0, 
message.length);
-               }
-
-               try {
-                       return serializer.deserialize(dis);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Unable to deserialize 
message", e);
-               }
-       }
-
-       /**
-        * This schema never considers an element to signal end-of-stream, so 
this method returns always false.
-        * @param nextElement The element to test for the end-of-stream signal.
-        * @return Returns false.
-        */
-       @Override
-       public boolean isEndOfStream(T nextElement) {
-               return false;
-       }
-
-       @Override
-       public byte[] serialize(T element) {
-               if (dos == null) {
-                       dos = new DataOutputSerializer(16);
-               }
-
-               try {
-                       serializer.serialize(element, dos);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Unable to serialize 
record", e);
-               }
-
-               byte[] ret = dos.getByteArray();
-               if (ret.length != dos.length()) {
-                       byte[] n = new byte[dos.length()];
-                       System.arraycopy(ret, 0, n, 0, dos.length());
-                       ret = n;
-               }
-               dos.clear();
-               return ret;
-       }
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               if (typeInfo != null) {
-                       return typeInfo;
-               }
-               else {
-                       throw new IllegalStateException(
-                                       "The type information is not available 
after this class has been serialized and distributed.");
-               }
+               super(typeInfo, ec);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 6cdce11..b3c4ee9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.api.functions.sink;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.io.IOUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
deleted file mode 100644
index 818a43b..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AbstractDeserializationSchema}.
- */
-@SuppressWarnings("serial")
-public class AbstractDeserializationSchemaTest {
-
-       @Test
-       public void testTypeExtractionTuple() {
-               TypeInformation<Tuple2<byte[], byte[]>> type = new 
TupleSchema().getProducedType();
-               TypeInformation<Tuple2<byte[], byte[]>> expected = 
TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
-               assertEquals(expected, type);
-       }
-
-       @Test
-       public void testTypeExtractionTupleAnonymous() {
-               TypeInformation<Tuple2<byte[], byte[]>> type = new 
AbstractDeserializationSchema<Tuple2<byte[], byte[]>>() {
-                       @Override
-                       public Tuple2<byte[], byte[]> deserialize(byte[] 
message) throws IOException {
-                               throw new UnsupportedOperationException();
-                       }
-               }.getProducedType();
-
-               TypeInformation<Tuple2<byte[], byte[]>> expected = 
TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
-               assertEquals(expected, type);
-       }
-
-       @Test
-       public void testTypeExtractionGeneric() {
-               TypeInformation<JSONPObject> type = new 
JsonSchema().getProducedType();
-               TypeInformation<JSONPObject> expected = TypeInformation.of(new 
TypeHint<JSONPObject>(){});
-               assertEquals(expected, type);
-       }
-
-       @Test
-       public void testTypeExtractionGenericAnonymous() {
-               TypeInformation<JSONPObject> type = new 
AbstractDeserializationSchema<JSONPObject>() {
-                       @Override
-                       public JSONPObject deserialize(byte[] message) throws 
IOException {
-                               throw new UnsupportedOperationException();
-                       }
-               }.getProducedType();
-
-               TypeInformation<JSONPObject> expected = TypeInformation.of(new 
TypeHint<JSONPObject>(){});
-               assertEquals(expected, type);
-       }
-
-       @Test
-       public void testTypeExtractionRawException() {
-               try {
-                       new RawSchema().getProducedType();
-                       fail();
-               } catch (InvalidTypesException e) {
-                       // expected
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Test types
-       // 
------------------------------------------------------------------------
-
-       private static class TupleSchema extends 
AbstractDeserializationSchema<Tuple2<byte[], byte[]>> {
-
-               @Override
-               public Tuple2<byte[], byte[]> deserialize(byte[] message) 
throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-       }
-
-       private static class JsonSchema extends 
AbstractDeserializationSchema<JSONPObject> {
-
-               @Override
-               public JSONPObject deserialize(byte[] message) throws 
IOException {
-                       throw new UnsupportedOperationException();
-               }
-       }
-
-       @SuppressWarnings("rawtypes")
-       private static class RawSchema extends AbstractDeserializationSchema {
-
-               @Override
-               public Object deserialize(byte[] message) throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
deleted file mode 100644
index 317f2e3..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link TypeInformationSerializationSchema}.
- */
-public class TypeInformationSerializationSchemaTest {
-
-       @Test
-       public void testDeSerialization() {
-               try {
-                       TypeInformation<MyPOJO> info = 
TypeExtractor.getForClass(MyPOJO.class);
-
-                       TypeInformationSerializationSchema<MyPOJO> schema =
-                                       new 
TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-
-                       MyPOJO[] types = {
-                                       new MyPOJO(72, new Date(763784523L), 
new Date(88234L)),
-                                       new MyPOJO(-1, new 
Date(11111111111111L)),
-                                       new MyPOJO(42),
-                                       new MyPOJO(17, new Date(222763784523L))
-                       };
-
-                       for (MyPOJO val : types) {
-                               byte[] serialized = schema.serialize(val);
-                               MyPOJO deser = schema.deserialize(serialized);
-                               assertEquals(val, deser);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSerializability() {
-               try {
-                       TypeInformation<MyPOJO> info = 
TypeExtractor.getForClass(MyPOJO.class);
-                       TypeInformationSerializationSchema<MyPOJO> schema =
-                                       new 
TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-
-                       // this needs to succeed
-                       CommonTestUtils.createCopySerializable(schema);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Test data types
-       // 
------------------------------------------------------------------------
-
-       private static class MyPOJO {
-
-               public int aField;
-               public List<Date> aList;
-
-               public MyPOJO() {}
-
-               public MyPOJO(int iVal, Date... dates) {
-                       this.aField = iVal;
-                       this.aList = new ArrayList<>(Arrays.asList(dates));
-               }
-
-               @Override
-               public int hashCode() {
-                       return aField;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof MyPOJO) {
-                               MyPOJO that = (MyPOJO) obj;
-                               return this.aField == that.aField && 
(this.aList == null ?
-                                               that.aList == null :
-                                               that.aList != null && 
this.aList.equals(that.aList));
-                       }
-                       return super.equals(obj);
-               }
-
-               @Override
-               public String toString() {
-                       return "MyPOJO " + aField + " " + aList;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
deleted file mode 100644
index 6081ed1..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.core.testutils.CommonTestUtils;
-
-import org.junit.Test;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link SimpleStringSchema}.
- */
-public class SimpleStringSchemaTest {
-
-       @Test
-       public void testSerializationWithAnotherCharset() {
-               final Charset charset = StandardCharsets.UTF_16BE;
-               final String string = "之掃描古籍版實乃姚鼐的";
-               final byte[] bytes = string.getBytes(charset);
-
-               assertArrayEquals(bytes, new 
SimpleStringSchema(charset).serialize(string));
-               assertEquals(string, new 
SimpleStringSchema(charset).deserialize(bytes));
-       }
-
-       @Test
-       public void testSerializability() throws Exception {
-               final SimpleStringSchema schema = new 
SimpleStringSchema(StandardCharsets.UTF_16LE);
-               final SimpleStringSchema copy = 
CommonTestUtils.createCopySerializable(schema);
-
-               assertEquals(schema.getCharset(), copy.getCharset());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index b5a7cd6..ef2e741 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, 
MapFunction, Partitioner}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.operators.ResourceSpec
+import org.apache.flink.api.common.serialization.SerializationSchema
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
@@ -38,7 +39,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, 
TimeWindow, Window}
-import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
 
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
index 3b47429..991f241 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.api.scala
 
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 
 import scala.language.existentials
 

Reply via email to