[scala] [streaming] Added package file for streaming scala api typeinfo implicits and conversions
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5daa45c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5daa45c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5daa45c2 Branch: refs/heads/master Commit: 5daa45c2c2f296e5018a2fb2b1989fa3d32f59fb Parents: 8500ad0 Author: Gyula Fora <[email protected]> Authored: Tue Jan 6 12:05:22 2015 +0100 Committer: mbalassi <[email protected]> Committed: Tue Jan 6 15:09:04 2015 +0100 ---------------------------------------------------------------------- .../examples/windowing/TopSpeedWindowing.scala | 3 +- .../scala/examples/windowing/WindowJoin.scala | 3 +- .../api/scala/ConnectedDataStream.scala | 3 +- .../flink/streaming/api/scala/DataStream.scala | 3 -- .../streaming/api/scala/SplitDataStream.scala | 2 - .../api/scala/StreamCrossOperator.scala | 2 - .../api/scala/StreamExecutionEnvironment.scala | 4 +- .../api/scala/StreamJoinOperator.scala | 1 - .../api/scala/StreamingConversions.scala | 40 ----------------- .../api/scala/WindowedDataStream.scala | 2 - .../flink/streaming/api/scala/package.scala | 46 ++++++++++++++++++++ .../streaming/api/scala/windowing/Time.scala | 1 - 12 files changed, 50 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index dc01f02..a18eb37 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -21,8 +21,7 @@ package org.apache.flink.streaming.scala.examples.windowing import java.util.concurrent.TimeUnit._ -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.math.{max, min} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index e87d4a1..08c7d65 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -18,8 +18,7 @@ package org.apache.flink.streaming.scala.examples.windowing -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.util.Random import java.util.concurrent.TimeUnit http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala index 320bfa0..d60e796 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala @@ -25,12 +25,11 @@ import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream} import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction } import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.util.Collector class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 270b80c..6d94de7 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,10 +18,8 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator, GroupedDataStream} -import org.apache.flink.api.common.typeinfo.TypeInformation import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.functions.MapFunction @@ -47,7 +45,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.AggregationFunction import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator class DataStream[T](javaStream: JavaStream[T]) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala index 7349db6..a4156a1 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala @@ -18,9 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.streaming.api.scala.StreamingConversions._ /** * The SplitDataStream represents an operator that has been split using an http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index d620d5e..e300610 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -23,14 +23,12 @@ import org.apache.commons.lang.Validate import org.apache.flink.api.common.functions.CrossFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} import org.apache.flink.streaming.api.function.co.CrossWindowFunction import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow import java.util.concurrent.TimeUnit http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 61a6109..b4565c7 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -19,14 +19,12 @@ package org.apache.flink.streaming.api.scala import scala.reflect.ClassTag - import org.apache.commons.lang.Validate import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction } -import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream import org.apache.flink.util.Collector +import org.apache.flink.api.scala.ClosureCleaner class StreamExecutionEnvironment(javaEnv: JavaEnv) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index cb79e2a..32765da 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.streaming.api.function.co.JoinWindowFunction import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream import org.apache.flink.streaming.util.keys.KeySelectorUtil import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow import java.util.concurrent.TimeUnit http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala deleted file mode 100644 index fb3745f..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } -import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } - -object StreamingConversions { - - implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = - new DataStream[R](javaStream) - - implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = - new WindowedDataStream[R](javaWStream) - - implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = - new SplitDataStream[R](javaStream) - - implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): - ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala index deda3d9..0b88137 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -27,13 +27,11 @@ import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.api.scala._ import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream} import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.function.aggregation.SumFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.helper._ import org.apache.flink.util.Collector http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala new file mode 100644 index 0000000..3604b55 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api + +import _root_.scala.reflect.ClassTag +import language.experimental.macros +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } + +package object scala { + // We have this here so that we always have generated TypeInformationS when + // using the Scala API + implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] + + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = + new DataStream[R](javaStream) + + implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = + new WindowedDataStream[R](javaWStream) + + implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = + new SplitDataStream[R](javaStream) + + implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): + ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala index e1b9768..3581730 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala.windowing import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } -import org.apache.flink.api.scala.ClosureCleaner import org.apache.commons.net.ntp.TimeStamp import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; import org.apache.flink.streaming.api.windowing.helper.Timestamp
