http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 30f6dc2..de72985 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.ConfigConstants; @@ -32,7 +33,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass;
http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index dcf3167..3138152 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -18,12 +18,12 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java index 218401c..7a882f4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 21171f8..6851474 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -17,17 +17,18 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import kafka.server.KafkaServer; + import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index b204ea9..e432a65 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -21,6 +21,8 @@ package org.apache.flink.streaming.connectors.kafka.testutils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.datastream.DataStream; @@ -36,8 +38,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import java.util.Collection; import java.util.Properties; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index b3c6f8c..c1118ed 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.connectors.rabbitmq; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 5018bcf..d454153 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourc import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.util.Preconditions; import com.rabbitmq.client.Channel; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java index 93f884b..53b834d 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.connectors.rabbitmq; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index 0996355..bbf893f 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -32,7 +33,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 5b97273..ae3f56e 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -107,6 +107,9 @@ under the License. <scope>test</scope> </dependency> + <!-- Joda and jackson are used to test that serialization and type extraction + work with types from those libraries --> + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> @@ -118,6 +121,13 @@ under the License. <artifactId>joda-convert</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <profiles> @@ -208,6 +218,11 @@ under the License. <exclude>org.apache.flink.configuration.ConfigConstants#ENABLE_QUARANTINE_MONITOR</exclude> <exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY</exclude> <exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY</exclude> + + <!-- apparently there is a bug in the plugin which makes it fail on this new file, event though + its new, and not conflicting/breaking --> + <exclude>org.apache.flink.api.common.serialization.DeserializationSchema</exclude> + <exclude>org.apache.flink.api.common.serialization.SerializationSchema</exclude> </excludes> </parameter> </configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java new file mode 100644 index 0000000..871b7b1 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.io.IOException; + +/** + * The deserialization schema describes how to turn the byte messages delivered by certain + * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are + * processed by Flink. + * + * <p>This base variant of the deserialization schema produces the type information + * automatically by extracting it from the generic class arguments. + * + * @param <T> The type created by the deserialization schema. + */ +@PublicEvolving +public abstract class AbstractDeserializationSchema<T> implements DeserializationSchema<T> { + + private static final long serialVersionUID = 1L; + + /** + * De-serializes the byte message. + * + * @param message The message, as a byte array. + * @return The de-serialized message as an object. + */ + @Override + public abstract T deserialize(byte[] message) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * <p>This default implementation returns always false, meaning the stream is interpreted + * to be unbounded. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + @Override + public boolean isEndOfStream(T nextElement) { + return false; + } + + @Override + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java new file mode 100644 index 0000000..9de4743 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the byte messages delivered by certain + * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are + * processed by Flink. + * + * <p>Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which + * takes care of producing the return type information automatically. + * + * @param <T> The type created by the deserialization schema. + */ +@Public +public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + /** + * Deserializes the byte message. + * + * @param message The message, as a byte array. + * + * @return The deserialized message as an object (null if the message cannot be deserialized). + */ + T deserialize(byte[] message) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + boolean isEndOfStream(T nextElement); +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java new file mode 100644 index 0000000..3a4eaeb --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +import java.io.Serializable; + +/** + * The serialization schema describes how to turn a data object into a different serialized + * representation. Most data sinks (for example Apache Kafka) require the data to be handed + * to them in a specific format (for example as byte strings). + * + * @param <T> The type to be serialized. + */ +@Public +public interface SerializationSchema<T> extends Serializable { + + /** + * Serializes the incoming element to a specified type. + * + * @param element + * The incoming element to be serialized + * @return The serialized element. + */ + byte[] serialize(T element); +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java new file mode 100644 index 0000000..3130a10 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Very simple serialization schema for strings. + * + * <p>By default, the serializer uses "UTF-8" for string/byte conversion. + */ +@PublicEvolving +public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> { + + private static final long serialVersionUID = 1L; + + /** The charset to use to convert between strings and bytes. + * The field is transient because we serialize a different delegate object instead */ + private transient Charset charset; + + /** + * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding. + */ + public SimpleStringSchema() { + this(StandardCharsets.UTF_8); + } + + /** + * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes. + * + * @param charset The charset to use to convert between strings and bytes. + */ + public SimpleStringSchema(Charset charset) { + this.charset = checkNotNull(charset); + } + + /** + * Gets the charset used by this schema for serialization. + * @return The charset used by this schema for serialization. + */ + public Charset getCharset() { + return charset; + } + + // ------------------------------------------------------------------------ + // Kafka Serialization + // ------------------------------------------------------------------------ + + @Override + public String deserialize(byte[] message) { + return new String(message, charset); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public byte[] serialize(String element) { + return element.getBytes(charset); + } + + @Override + public TypeInformation<String> getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + + // ------------------------------------------------------------------------ + // Java Serialization + // ------------------------------------------------------------------------ + + private void writeObject (ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + out.writeUTF(charset.name()); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + String charsetName = in.readUTF(); + this.charset = Charset.forName(charsetName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java new file mode 100644 index 0000000..217a889 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; + +/** + * A serialization and deserialization schema that uses Flink's serialization stack to + * transform typed from and to byte arrays. + * + * @param <T> The type to be serialized. + */ +@Public +public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T> { + + private static final long serialVersionUID = -5359448468131559102L; + + /** The serializer for the actual de-/serialization. */ + private final TypeSerializer<T> serializer; + + /** The reusable output serialization buffer. */ + private transient DataOutputSerializer dos; + + /** The reusable input deserialization buffer. */ + private transient DataInputDeserializer dis; + + /** + * The type information, to be returned by {@link #getProducedType()}. It is transient, because + * it is not serializable. Note that this means that the type information is not available at + * runtime, but only prior to the first serialization / deserialization. + */ + private transient TypeInformation<T> typeInfo; + + // ------------------------------------------------------------------------ + + /** + * Creates a new de-/serialization schema for the given type. + * + * @param typeInfo The type information for the type de-/serialized by this schema. + * @param ec The execution config, which is used to parametrize the type serializers. + */ + public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, ExecutionConfig ec) { + this.typeInfo = typeInfo; + this.serializer = typeInfo.createSerializer(ec); + } + + // ------------------------------------------------------------------------ + + @Override + public T deserialize(byte[] message) { + if (dis != null) { + dis.setBuffer(message, 0, message.length); + } else { + dis = new DataInputDeserializer(message, 0, message.length); + } + + try { + return serializer.deserialize(dis); + } + catch (IOException e) { + throw new RuntimeException("Unable to deserialize message", e); + } + } + + /** + * This schema never considers an element to signal end-of-stream, so this method returns always false. + * @param nextElement The element to test for the end-of-stream signal. + * @return Returns false. + */ + @Override + public boolean isEndOfStream(T nextElement) { + return false; + } + + @Override + public byte[] serialize(T element) { + if (dos == null) { + dos = new DataOutputSerializer(16); + } + + try { + serializer.serialize(element, dos); + } + catch (IOException e) { + throw new RuntimeException("Unable to serialize record", e); + } + + byte[] ret = dos.getByteArray(); + if (ret.length != dos.length()) { + byte[] n = new byte[dos.length()]; + System.arraycopy(ret, 0, n, 0, dos.length()); + ret = n; + } + dos.clear(); + return ret; + } + + @Override + public TypeInformation<T> getProducedType() { + if (typeInfo != null) { + return typeInfo; + } + else { + throw new IllegalStateException( + "The type information is not available after this class has been serialized and distributed."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java new file mode 100644 index 0000000..ec241b4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for {@link AbstractDeserializationSchema}. + */ +@SuppressWarnings("serial") +public class AbstractDeserializationSchemaTest { + + @Test + public void testTypeExtractionTuple() { + TypeInformation<Tuple2<byte[], byte[]>> type = new TupleSchema().getProducedType(); + TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){}); + assertEquals(expected, type); + } + + @Test + public void testTypeExtractionTupleAnonymous() { + TypeInformation<Tuple2<byte[], byte[]>> type = new AbstractDeserializationSchema<Tuple2<byte[], byte[]>>() { + @Override + public Tuple2<byte[], byte[]> deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException(); + } + }.getProducedType(); + + TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){}); + assertEquals(expected, type); + } + + @Test + public void testTypeExtractionGeneric() { + TypeInformation<JSONPObject> type = new JsonSchema().getProducedType(); + TypeInformation<JSONPObject> expected = TypeInformation.of(new TypeHint<JSONPObject>(){}); + assertEquals(expected, type); + } + + @Test + public void testTypeExtractionGenericAnonymous() { + TypeInformation<JSONPObject> type = new AbstractDeserializationSchema<JSONPObject>() { + @Override + public JSONPObject deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException(); + } + }.getProducedType(); + + TypeInformation<JSONPObject> expected = TypeInformation.of(new TypeHint<JSONPObject>(){}); + assertEquals(expected, type); + } + + @Test + public void testTypeExtractionRawException() { + try { + new RawSchema().getProducedType(); + fail(); + } catch (InvalidTypesException e) { + // expected + } + } + + // ------------------------------------------------------------------------ + // Test types + // ------------------------------------------------------------------------ + + private static class TupleSchema extends AbstractDeserializationSchema<Tuple2<byte[], byte[]>> { + + @Override + public Tuple2<byte[], byte[]> deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException(); + } + } + + private static class JsonSchema extends AbstractDeserializationSchema<JSONPObject> { + + @Override + public JSONPObject deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException(); + } + } + + @SuppressWarnings("rawtypes") + private static class RawSchema extends AbstractDeserializationSchema { + + @Override + public Object deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java new file mode 100644 index 0000000..482ff13 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link SimpleStringSchema}. + */ +public class SimpleStringSchemaTest { + + @Test + public void testSerializationWithAnotherCharset() { + final Charset charset = StandardCharsets.UTF_16BE; + final String string = "乿æå¤ç±ç實ä¹å§é¼ç"; + final byte[] bytes = string.getBytes(charset); + + assertArrayEquals(bytes, new SimpleStringSchema(charset).serialize(string)); + assertEquals(string, new SimpleStringSchema(charset).deserialize(bytes)); + } + + @Test + public void testSerializability() throws Exception { + final SimpleStringSchema schema = new SimpleStringSchema(StandardCharsets.UTF_16LE); + final SimpleStringSchema copy = CommonTestUtils.createCopySerializable(schema); + + assertEquals(schema.getCharset(), copy.getCharset()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java new file mode 100644 index 0000000..ef5f4b0 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for {@link TypeInformationSerializationSchema}. + */ +public class TypeInformationSerializationSchemaTest { + + @Test + public void testDeSerialization() { + try { + TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class); + + TypeInformationSerializationSchema<MyPOJO> schema = + new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig()); + + MyPOJO[] types = { + new MyPOJO(72, new Date(763784523L), new Date(88234L)), + new MyPOJO(-1, new Date(11111111111111L)), + new MyPOJO(42), + new MyPOJO(17, new Date(222763784523L)) + }; + + for (MyPOJO val : types) { + byte[] serialized = schema.serialize(val); + MyPOJO deser = schema.deserialize(serialized); + assertEquals(val, deser); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerializability() { + try { + TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class); + TypeInformationSerializationSchema<MyPOJO> schema = + new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig()); + + // this needs to succeed + CommonTestUtils.createCopySerializable(schema); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // Test data types + // ------------------------------------------------------------------------ + + private static class MyPOJO { + + public int aField; + public List<Date> aList; + + public MyPOJO() {} + + public MyPOJO(int iVal, Date... dates) { + this.aField = iVal; + this.aList = new ArrayList<>(Arrays.asList(dates)); + } + + @Override + public int hashCode() { + return aField; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MyPOJO) { + MyPOJO that = (MyPOJO) obj; + return this.aField == that.aField && (this.aList == null ? + that.aList == null : + that.aList != null && this.aList.equals(that.aList)); + } + return super.equals(obj); + } + + @Override + public String toString() { + return "MyPOJO " + aField + " " + aList; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java index b5abbc5..3fbd2b4 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java @@ -19,12 +19,12 @@ package org.apache.flink.streaming.examples.kafka; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; /** http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala index 2a52811..9f4fdc4 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala @@ -19,10 +19,10 @@ package org.apache.flink.streaming.scala.examples.kafka import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} -import org.apache.flink.streaming.util.serialization.SimpleStringSchema /** * An example that shows how to read from and write to Kafka. This will read String messages http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 5b15b5a..2683546 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -77,12 +77,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-jackson</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils-junit</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index d0769c6..2274968 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -91,7 +92,6 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.keys.KeySelectorUtil; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.Preconditions; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index 214d5c2..80e0dbe 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.SerializableObject; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java index 02ea004..7d30f91 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java @@ -18,10 +18,7 @@ package org.apache.flink.streaming.util.serialization; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import java.io.IOException; +import org.apache.flink.annotation.PublicEvolving; /** * The deserialization schema describes how to turn the byte messages delivered by certain @@ -32,37 +29,15 @@ import java.io.IOException; * automatically by extracting it from the generic class arguments. * * @param <T> The type created by the deserialization schema. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.AbstractDeserializationSchema} instead. */ -public abstract class AbstractDeserializationSchema<T> implements DeserializationSchema<T> { +@Deprecated +@PublicEvolving +@SuppressWarnings("deprecation") +public abstract class AbstractDeserializationSchema<T> + extends org.apache.flink.api.common.serialization.AbstractDeserializationSchema<T> + implements DeserializationSchema<T> { private static final long serialVersionUID = 1L; - - /** - * De-serializes the byte message. - * - * @param message The message, as a byte array. - * @return The de-serialized message as an object. - */ - @Override - public abstract T deserialize(byte[] message) throws IOException; - - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted. - * - * <p>This default implementation returns always false, meaning the stream is interpreted - * to be unbounded. - * - * @param nextElement The element to test for the end-of-stream signal. - * @return True, if the element signals end of stream, false otherwise. - */ - @Override - public boolean isEndOfStream(T nextElement) { - return false; - } - - @Override - public TypeInformation<T> getProducedType() { - return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java index 15ecb2c..cbaa004 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java @@ -32,9 +32,15 @@ import java.io.Serializable; * takes care of producing the return type information automatically. * * @param <T> The type created by the deserialization schema. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.DeserializationSchema} instead. */ @Public -public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { +@Deprecated +public interface DeserializationSchema<T> extends + org.apache.flink.api.common.serialization.DeserializationSchema<T>, + Serializable, + ResultTypeQueryable<T> { /** * Deserializes the byte message. @@ -43,6 +49,7 @@ public interface DeserializationSchema<T> extends Serializable, ResultTypeQuerya * * @return The deserialized message as an object (null if the message cannot be deserialized). */ + @Override T deserialize(byte[] message) throws IOException; /** @@ -52,5 +59,6 @@ public interface DeserializationSchema<T> extends Serializable, ResultTypeQuerya * @param nextElement The element to test for the end-of-stream signal. * @return True, if the element signals end of stream, false otherwise. */ + @Override boolean isEndOfStream(T nextElement); } http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java index 986cfb3..c7c1de0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java @@ -27,9 +27,13 @@ import java.io.Serializable; * to them in a specific format (for example as byte strings). * * @param <T> The type to be serialized. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.SerializationSchema} instead. */ @Public -public interface SerializationSchema<T> extends Serializable { +@Deprecated +public interface SerializationSchema<T> + extends org.apache.flink.api.common.serialization.SerializationSchema<T>, Serializable { /** * Serializes the incoming element to a specified type. @@ -38,5 +42,6 @@ public interface SerializationSchema<T> extends Serializable { * The incoming element to be serialized * @return The serialized element. */ + @Override byte[] serialize(T element); } http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java index 27ba9e9..01ce30d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java @@ -18,35 +18,27 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * Very simple serialization schema for strings. * * <p>By default, the serializer uses "UTF-8" for string/byte conversion. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.SimpleStringSchema} instead. */ @PublicEvolving -public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> { +@Deprecated +@SuppressWarnings("deprecation") +public class SimpleStringSchema + extends org.apache.flink.api.common.serialization.SimpleStringSchema + implements SerializationSchema<String>, DeserializationSchema<String> { private static final long serialVersionUID = 1L; - /** The charset to use to convert between strings and bytes. - * The field is transient because we serialize a different delegate object instead */ - private transient Charset charset; - - /** - * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding. - */ public SimpleStringSchema() { - this(StandardCharsets.UTF_8); + super(); } /** @@ -55,53 +47,6 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial * @param charset The charset to use to convert between strings and bytes. */ public SimpleStringSchema(Charset charset) { - this.charset = checkNotNull(charset); - } - - /** - * Gets the charset used by this schema for serialization. - * @return The charset used by this schema for serialization. - */ - public Charset getCharset() { - return charset; - } - - // ------------------------------------------------------------------------ - // Kafka Serialization - // ------------------------------------------------------------------------ - - @Override - public String deserialize(byte[] message) { - return new String(message, charset); - } - - @Override - public boolean isEndOfStream(String nextElement) { - return false; - } - - @Override - public byte[] serialize(String element) { - return element.getBytes(charset); - } - - @Override - public TypeInformation<String> getProducedType() { - return BasicTypeInfo.STRING_TYPE_INFO; - } - - // ------------------------------------------------------------------------ - // Java Serialization - // ------------------------------------------------------------------------ - - private void writeObject (ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - out.writeUTF(charset.name()); - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - String charsetName = in.readUTF(); - this.charset = Charset.forName(charsetName); + super(charset); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java index 1c50dc2..b771fe0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java @@ -21,41 +21,24 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; - -import java.io.IOException; /** * A serialization and deserialization schema that uses Flink's serialization stack to * transform typed from and to byte arrays. * * @param <T> The type to be serialized. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.TypeInformationSerializationSchema} instead. */ @Public -public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T> { +@Deprecated +@SuppressWarnings("deprecation") +public class TypeInformationSerializationSchema<T> + extends org.apache.flink.api.common.serialization.TypeInformationSerializationSchema<T> + implements DeserializationSchema<T>, SerializationSchema<T> { private static final long serialVersionUID = -5359448468131559102L; - /** The serializer for the actual de-/serialization. */ - private final TypeSerializer<T> serializer; - - /** The reusable output serialization buffer. */ - private transient DataOutputSerializer dos; - - /** The reusable input deserialization buffer. */ - private transient DataInputDeserializer dis; - - /** - * The type information, to be returned by {@link #getProducedType()}. It is transient, because - * it is not serializable. Note that this means that the type information is not available at - * runtime, but only prior to the first serialization / deserialization. - */ - private transient TypeInformation<T> typeInfo; - - // ------------------------------------------------------------------------ - /** * Creates a new de-/serialization schema for the given type. * @@ -63,69 +46,6 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch * @param ec The execution config, which is used to parametrize the type serializers. */ public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, ExecutionConfig ec) { - this.typeInfo = typeInfo; - this.serializer = typeInfo.createSerializer(ec); - } - - // ------------------------------------------------------------------------ - - @Override - public T deserialize(byte[] message) { - if (dis != null) { - dis.setBuffer(message, 0, message.length); - } else { - dis = new DataInputDeserializer(message, 0, message.length); - } - - try { - return serializer.deserialize(dis); - } - catch (IOException e) { - throw new RuntimeException("Unable to deserialize message", e); - } - } - - /** - * This schema never considers an element to signal end-of-stream, so this method returns always false. - * @param nextElement The element to test for the end-of-stream signal. - * @return Returns false. - */ - @Override - public boolean isEndOfStream(T nextElement) { - return false; - } - - @Override - public byte[] serialize(T element) { - if (dos == null) { - dos = new DataOutputSerializer(16); - } - - try { - serializer.serialize(element, dos); - } - catch (IOException e) { - throw new RuntimeException("Unable to serialize record", e); - } - - byte[] ret = dos.getByteArray(); - if (ret.length != dos.length()) { - byte[] n = new byte[dos.length()]; - System.arraycopy(ret, 0, n, 0, dos.length()); - ret = n; - } - dos.clear(); - return ret; - } - - @Override - public TypeInformation<T> getProducedType() { - if (typeInfo != null) { - return typeInfo; - } - else { - throw new IllegalStateException( - "The type information is not available after this class has been serialized and distributed."); - } + super(typeInfo, ec); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java index 6cdce11..b3c4ee9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.functions.sink; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.TestLogger; import org.apache.commons.io.IOUtils; http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java deleted file mode 100644 index 818a43b..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject; - -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Tests for {@link AbstractDeserializationSchema}. - */ -@SuppressWarnings("serial") -public class AbstractDeserializationSchemaTest { - - @Test - public void testTypeExtractionTuple() { - TypeInformation<Tuple2<byte[], byte[]>> type = new TupleSchema().getProducedType(); - TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){}); - assertEquals(expected, type); - } - - @Test - public void testTypeExtractionTupleAnonymous() { - TypeInformation<Tuple2<byte[], byte[]>> type = new AbstractDeserializationSchema<Tuple2<byte[], byte[]>>() { - @Override - public Tuple2<byte[], byte[]> deserialize(byte[] message) throws IOException { - throw new UnsupportedOperationException(); - } - }.getProducedType(); - - TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){}); - assertEquals(expected, type); - } - - @Test - public void testTypeExtractionGeneric() { - TypeInformation<JSONPObject> type = new JsonSchema().getProducedType(); - TypeInformation<JSONPObject> expected = TypeInformation.of(new TypeHint<JSONPObject>(){}); - assertEquals(expected, type); - } - - @Test - public void testTypeExtractionGenericAnonymous() { - TypeInformation<JSONPObject> type = new AbstractDeserializationSchema<JSONPObject>() { - @Override - public JSONPObject deserialize(byte[] message) throws IOException { - throw new UnsupportedOperationException(); - } - }.getProducedType(); - - TypeInformation<JSONPObject> expected = TypeInformation.of(new TypeHint<JSONPObject>(){}); - assertEquals(expected, type); - } - - @Test - public void testTypeExtractionRawException() { - try { - new RawSchema().getProducedType(); - fail(); - } catch (InvalidTypesException e) { - // expected - } - } - - // ------------------------------------------------------------------------ - // Test types - // ------------------------------------------------------------------------ - - private static class TupleSchema extends AbstractDeserializationSchema<Tuple2<byte[], byte[]>> { - - @Override - public Tuple2<byte[], byte[]> deserialize(byte[] message) throws IOException { - throw new UnsupportedOperationException(); - } - } - - private static class JsonSchema extends AbstractDeserializationSchema<JSONPObject> { - - @Override - public JSONPObject deserialize(byte[] message) throws IOException { - throw new UnsupportedOperationException(); - } - } - - @SuppressWarnings("rawtypes") - private static class RawSchema extends AbstractDeserializationSchema { - - @Override - public Object deserialize(byte[] message) throws IOException { - throw new UnsupportedOperationException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java deleted file mode 100644 index 317f2e3..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Tests for {@link TypeInformationSerializationSchema}. - */ -public class TypeInformationSerializationSchemaTest { - - @Test - public void testDeSerialization() { - try { - TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class); - - TypeInformationSerializationSchema<MyPOJO> schema = - new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig()); - - MyPOJO[] types = { - new MyPOJO(72, new Date(763784523L), new Date(88234L)), - new MyPOJO(-1, new Date(11111111111111L)), - new MyPOJO(42), - new MyPOJO(17, new Date(222763784523L)) - }; - - for (MyPOJO val : types) { - byte[] serialized = schema.serialize(val); - MyPOJO deser = schema.deserialize(serialized); - assertEquals(val, deser); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSerializability() { - try { - TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class); - TypeInformationSerializationSchema<MyPOJO> schema = - new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig()); - - // this needs to succeed - CommonTestUtils.createCopySerializable(schema); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // ------------------------------------------------------------------------ - // Test data types - // ------------------------------------------------------------------------ - - private static class MyPOJO { - - public int aField; - public List<Date> aList; - - public MyPOJO() {} - - public MyPOJO(int iVal, Date... dates) { - this.aField = iVal; - this.aList = new ArrayList<>(Arrays.asList(dates)); - } - - @Override - public int hashCode() { - return aField; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof MyPOJO) { - MyPOJO that = (MyPOJO) obj; - return this.aField == that.aField && (this.aList == null ? - that.aList == null : - that.aList != null && this.aList.equals(that.aList)); - } - return super.equals(obj); - } - - @Override - public String toString() { - return "MyPOJO " + aField + " " + aList; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java deleted file mode 100644 index 6081ed1..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.core.testutils.CommonTestUtils; - -import org.junit.Test; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -/** - * Tests for the {@link SimpleStringSchema}. - */ -public class SimpleStringSchemaTest { - - @Test - public void testSerializationWithAnotherCharset() { - final Charset charset = StandardCharsets.UTF_16BE; - final String string = "乿æå¤ç±ç實ä¹å§é¼ç"; - final byte[] bytes = string.getBytes(charset); - - assertArrayEquals(bytes, new SimpleStringSchema(charset).serialize(string)); - assertEquals(string, new SimpleStringSchema(charset).deserialize(bytes)); - } - - @Test - public void testSerializability() throws Exception { - final SimpleStringSchema schema = new SimpleStringSchema(StandardCharsets.UTF_16LE); - final SimpleStringSchema copy = CommonTestUtils.createCopySerializable(schema); - - assertEquals(schema.getCharset(), copy.getCharset()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index b5a7cd6..ef2e741 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.operators.ResourceSpec +import org.apache.flink.api.common.serialization.SerializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.tuple.{Tuple => JavaTuple} @@ -38,7 +39,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} -import org.apache.flink.streaming.util.serialization.SerializationSchema import org.apache.flink.util.Collector import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala index 3b47429..991f241 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.core.fs.FileSystem -import org.apache.flink.streaming.util.serialization.SimpleStringSchema import scala.language.existentials
