[scala] [streaming] Added package file for streaming scala api typeinfo implicits and conversions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544b9248 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544b9248 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544b9248 Branch: refs/heads/release-0.8 Commit: 544b9248ae78d31caf56a2a28726bb967bd36087 Parents: acd2d60 Author: Gyula Fora <[email protected]> Authored: Tue Jan 6 12:05:22 2015 +0100 Committer: mbalassi <[email protected]> Committed: Tue Jan 6 15:47:53 2015 +0100 ---------------------------------------------------------------------- .../examples/windowing/TopSpeedWindowing.scala | 3 +- .../scala/examples/windowing/WindowJoin.scala | 3 +- .../api/scala/ConnectedDataStream.scala | 3 +- .../flink/streaming/api/scala/DataStream.scala | 3 -- .../streaming/api/scala/SplitDataStream.scala | 2 - .../api/scala/StreamCrossOperator.scala | 2 - .../api/scala/StreamExecutionEnvironment.scala | 4 +- .../api/scala/StreamJoinOperator.scala | 1 - .../api/scala/StreamingConversions.scala | 40 ----------------- .../api/scala/WindowedDataStream.scala | 2 - .../flink/streaming/api/scala/package.scala | 46 ++++++++++++++++++++ .../streaming/api/scala/windowing/Time.scala | 1 - 12 files changed, 50 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index 54324e5..a86b953 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -21,8 +21,7 @@ package org.apache.flink.streaming.scala.examples.windowing import java.util.concurrent.TimeUnit._ -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.math.{max, min} http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index b109330..da69774 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -18,8 +18,7 @@ package org.apache.flink.streaming.scala.examples.windowing -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.util.Random import java.util.concurrent.TimeUnit http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala index 65b04b2..b9a8794 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala @@ -25,12 +25,11 @@ import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream} import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction } import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.util.Collector class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 93444a5..1eedfc9 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,10 +18,8 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator, GroupedDataStream} -import org.apache.flink.api.common.typeinfo.TypeInformation import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.functions.MapFunction @@ -47,7 +45,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.AggregationFunction import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator class DataStream[T](javaStream: JavaStream[T]) { http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala index a34248b..f8daa2e 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala @@ -18,9 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.streaming.api.scala.StreamingConversions._ /** * The SplitDataStream represents an operator that has been split using an http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index 67ac8ae..a8d4ac2 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -23,14 +23,12 @@ import org.apache.commons.lang.Validate import org.apache.flink.api.common.functions.CrossFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} import org.apache.flink.streaming.api.function.co.CrossWindowFunction import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow import java.util.concurrent.TimeUnit http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index eb34b80..b9006de 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -19,14 +19,12 @@ package org.apache.flink.streaming.api.scala import scala.reflect.ClassTag - import org.apache.commons.lang.Validate import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction } -import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream import org.apache.flink.util.Collector +import org.apache.flink.api.scala.ClosureCleaner class StreamExecutionEnvironment(javaEnv: JavaEnv) { http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index 0d95c5e..0457c18 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.streaming.api.function.co.JoinWindowFunction import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream import org.apache.flink.streaming.util.keys.KeySelectorUtil import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow import java.util.concurrent.TimeUnit http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala deleted file mode 100644 index 08ebe03..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } -import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } - -object StreamingConversions { - - implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = - new DataStream[R](javaStream) - - implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = - new WindowedDataStream[R](javaWStream) - - implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = - new SplitDataStream[R](javaStream) - - implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): - ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala index 9b7edd5..8f6fcbb 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -27,13 +27,11 @@ import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.api.scala._ import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream} import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.function.aggregation.SumFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.helper._ import org.apache.flink.util.Collector http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala new file mode 100644 index 0000000..f4c6bcf --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api + +import _root_.scala.reflect.ClassTag +import language.experimental.macros +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } + +package object scala { + // We have this here so that we always have generated TypeInformationS when + // using the Scala API + implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] + + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = + new DataStream[R](javaStream) + + implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = + new WindowedDataStream[R](javaWStream) + + implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = + new SplitDataStream[R](javaStream) + + implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): + ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) +} http://git-wip-us.apache.org/repos/asf/flink/blob/544b9248/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala index 457d10d..16d3d20 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala.windowing import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } -import org.apache.flink.api.scala.ClosureCleaner import org.apache.commons.net.ntp.TimeStamp import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; import org.apache.flink.streaming.api.windowing.helper.Timestamp
