[scala] [streaming] Fixed scala formatting

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6f215b50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6f215b50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6f215b50

Branch: refs/heads/master
Commit: 6f215b5044b193d1443b941cc8848c1896d7cc88
Parents: 75dd021
Author: Gyula Fora <[email protected]>
Authored: Sun Dec 21 01:05:40 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../flink/api/scala/streaming/DataStream.scala  | 93 +++++++++++++-------
 .../api/scala/streaming/FieldsKeySelector.scala |  2 +-
 .../api/scala/streaming/SplitDataStream.scala   | 17 ++--
 .../scala/streaming/StreamCrossOperator.scala   | 25 ++++--
 .../streaming/StreamExecutionEnvironment.scala  | 16 ++--
 .../scala/streaming/StreamJoinOperator.scala    | 29 +++---
 .../scala/streaming/WindowedDataStream.scala    | 31 ++++---
 7 files changed, 136 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index ecf5615..0cf4a60 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -82,7 +82,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
       case _ =>
-        throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " cannot have " +
+        throw new UnsupportedOperationException("Operator " + 
javaStream.toString +  " cannot " +
+          "have " +
           "parallelism.")
     }
     this
@@ -94,7 +95,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def getParallelism: Int = javaStream match {
     case op: SingleOutputStreamOperator[_, _] => op.getParallelism
     case _ =>
-      throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " does not have " +
+      throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " does not have" +
+        " "  +
         "parallelism.")
   }
 
@@ -139,7 +141,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the 
outputs will be distributed between the parallel instances of the next 
processing operator.
+   * partitioned by the selected fields. This setting only effects the how the 
outputs will be
+   * distributed between the parallel instances of the next processing 
operator.
    *
    */
   def partitionBy(fields: Int*): DataStream[T] =
@@ -147,7 +150,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the 
outputs will be distributed between the parallel instances of the next 
processing operator.
+   * partitioned by the selected fields. This setting only effects the how the 
outputs will be
+   * distributed between the parallel instances of the next processing 
operator.
    *
    */
   def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
@@ -155,7 +159,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the given Key. This setting only effects the how the 
outputs will be distributed between the parallel instances of the next 
processing operator.
+   * partitioned by the given Key. This setting only effects the how the 
outputs will be
+   * distributed between the parallel instances of the next processing 
operator.
    *
    */
   def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
@@ -222,7 +227,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    *
    */
-  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), 
maxWaitTimeMillis: Long = 0): DataStream[T] = {
+  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]),  
maxWaitTimeMillis:
+    Long = 0): DataStream[T] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
@@ -252,19 +258,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Applies an aggregation that that gives the current minimum element of the 
data stream by
-   * the given position. When equality, the user can set to get the first or 
last element with the minimal value.
+   * the given position. When equality, the user can set to get the first or 
last element with
+   * the minimal value.
    *
    */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType.MINBY, position, first)
+  def minBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType
+    .MINBY, position, first)
 
   /**
    * Applies an aggregation that that gives the current maximum element of the 
data stream by
-   * the given position. When equality, the user can set to get the first or 
last element with the maximal value.
+   * the given position. When equality, the user can set to get the first or 
last element with
+   * the maximal value.
    *
    */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType.MAXBY, position, first)
+  def maxBy(position: Int, first: Boolean = true): DataStream[T] =
+    aggregate(AggregationType.MAXBY, position, first)
 
-  private def aggregate(aggregationType: AggregationType, position: Int, 
first: Boolean = true): DataStream[T] = {
+  private def aggregate(aggregationType: AggregationType, position: Int, 
first: Boolean = true):
+    DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
@@ -272,15 +283,18 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val agg = new 
ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), 
position)
 
     val reducer = aggregationType match {
-      case AggregationType.SUM => new 
agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass()));
+      case AggregationType.SUM => new 
agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
+        getTypeClass()));
       case _ => new agg.ProductComparableAggregator(aggregationType, first)
     }
 
     val invokable = jStream match {
-      case groupedStream: GroupedDataStream[_] => new 
GroupedReduceInvokable(reducer, groupedStream.getKeySelector())
+      case groupedStream: GroupedDataStream[_] => new 
GroupedReduceInvokable(reducer,
+        groupedStream.getKeySelector())
       case _ => new StreamReduceInvokable(reducer)
     }
-    new DataStream[Product](jStream.transform("aggregation", 
jStream.getType(), invokable)).asInstanceOf[DataStream[T]]
+    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
+      invokable)).asInstanceOf[DataStream[T]]
   }
 
   /**
@@ -288,7 +302,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * received records.
    *
    */
-  def count: DataStream[Long] = new 
DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]]
+  def count: DataStream[Long] = new DataStream[java.lang.Long](
+    javaStream.count()).asInstanceOf[DataStream[Long]]
 
   /**
    * Creates a new DataStream by applying the given function to every element 
of this DataStream.
@@ -302,7 +317,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
       def map(in: T): R = cleanFun(in)
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[T, R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
+      new MapInvokable[T, R](mapper)))
   }
 
   /**
@@ -313,7 +329,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[T, R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
+      new MapInvokable[T, R](mapper)))
   }
 
   /**
@@ -324,7 +341,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (flatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    new DataStream[R](javaStream.transform("flatMap", 
implicitly[TypeInformation[R]], new FlatMapInvokable[T, R](flatMapper)))
+    new DataStream[R](javaStream.transform("flatMap", 
implicitly[TypeInformation[R]],
+      new FlatMapInvokable[T, R](flatMapper)))
   }
 
   /**
@@ -358,22 +376,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream 
using an associative reduce
-   * function.
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
    */
   def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
     if (reducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
     javaStream match {
-      case ds: GroupedDataStream[_] => new 
DataStream[T](javaStream.transform("reduce", javaStream.getType(), new 
GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
-      case _ => new DataStream[T](javaStream.transform("reduce", 
javaStream.getType(), new StreamReduceInvokable[T](reducer)))
+      case ds: GroupedDataStream[_] => new 
DataStream[T](javaStream.transform("reduce",
+        javaStream.getType(), new GroupedReduceInvokable[T](reducer, 
ds.getKeySelector())))
+      case _ => new DataStream[T](javaStream.transform("reduce", 
javaStream.getType(),
+        new StreamReduceInvokable[T](reducer)))
     }
   }
 
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream 
using an associative reduce
-   * function.
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
    */
   def reduce(fun: (T, T) => T): DataStream[T] = {
     if (fun == null) {
@@ -421,7 +441,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the trigger and eviction policies please use to
    * window(List(triggers), List(evicters))
    */
-  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = 
new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
+  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
 
   /**
    * Create a WindowedDataStream using the given TriggerPolicy-s and 
EvictionPolicy-s.
@@ -430,7 +451,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * use-cases please refer to window(WindowingHelper[_]*)
    *
    */
-  def window(triggers: List[TriggerPolicy[T]], evicters: 
List[EvictionPolicy[T]]): WindowedDataStream[T] = new 
WindowedDataStream[T](javaStream.window(triggers, evicters))
+  def window(triggers: List[TriggerPolicy[T]], evicters: 
List[EvictionPolicy[T]]):
+    WindowedDataStream[T] = new 
WindowedDataStream[T](javaStream.window(triggers, evicters))
 
   /**
    *
@@ -473,7 +495,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * to use custom join function.
    *
    */
-  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new 
StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
+  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
+    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
 
   /**
    * Initiates a temporal cross transformation that builds all pair
@@ -487,7 +510,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * to use custom join function.
    *
    */
-  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = new 
StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
+  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
+    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
 
   /**
    * Writes a DataStream to the standard output stream (stdout). For each
@@ -504,7 +528,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsText(path: String, millis: Long = 0): DataStream[T] = new 
DataStream[T](javaStream.writeAsText(path, millis))
+  def writeAsText(path: String, millis: Long = 0): DataStream[T] =
+    new DataStream[T](javaStream.writeAsText(path, millis))
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -513,7 +538,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = new 
DataStream[T](javaStream.writeAsCsv(path, millis))
+  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
+    new DataStream[T](javaStream.writeAsCsv(path, millis))
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added
@@ -521,7 +547,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * method is called.
    *
    */
-  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new 
DataStream[T](javaStream.addSink(sinkFuntion))
+  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
+    new DataStream[T](javaStream.addSink(sinkFuntion))
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added
@@ -540,4 +567,4 @@ class DataStream[T](javaStream: JavaStream[T]) {
     this.addSink(sinkFunction)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
index d7c9f96..b50d346 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
@@ -44,4 +44,4 @@ class FieldsKeySelector[IN](fields: Int*) extends 
KeySelector[IN, Tuple] {
       case _ => throw new RuntimeException("Only tuple types are supported")
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
index 0b0cce5..82a5c70 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -46,4 +47,4 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
    */
   def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
index 5dfbc3b..5f579e5 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -35,11 +35,13 @@ import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.api.common.functions.CrossFunction
 
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, 
i2) {
+class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
+  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
 
   override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, 
I2] = {
 
-    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, 
(l: I1, r: I2) => (l, r))
+    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
+      (l: I1, r: I2) => (l, r))
 
     val returnType = new CaseClassTypeInfo[(I1, I2)](
 
@@ -69,24 +71,31 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: 
JavaStream[I2]) extend
 }
 object StreamCrossOperator {
 
-  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], 
javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
+  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
+                                           javaStream: JavaStream[(I1, I2)]) 
extends
+    DataStream[(I1, I2)](javaStream) {
 
     /**
-     * Sets a wrapper for the crossed elements. For each crossed pair, the 
result of the udf call will be emitted.
+     * Sets a wrapper for the crossed elements. For each crossed pair, the 
result of the udf
+     * call will be emitted.
      *
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] 
= {
 
       val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getCrossWindowFunction(op, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1, op.timeStamp2)
+        clean(getCrossWindowFunction(op, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
 
-      
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
 invokable)
+      
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+        invokable)
 
       new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
     }
   }
 
-  private[flink] def getCrossWindowFunction[I1, I2, R](op: 
StreamCrossOperator[I1, I2], crossFunction: (I1, I2) => R): 
CrossWindowFunction[I1, I2, R] = {
+  private[flink] def getCrossWindowFunction[I1, I2, R](op: 
StreamCrossOperator[I1, I2],
+                                                       crossFunction: (I1, I2) 
=> R):
+  CrossWindowFunction[I1, I2, R] = {
     Validate.notNull(crossFunction, "Join function must not be null.")
 
     val crossFun = new CrossFunction[I1, I2, R] {
@@ -100,4 +109,4 @@ object StreamCrossOperator {
     new CrossWindowFunction[I1, I2, R](crossFun)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index 340ecc1..55f7c6c 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -117,7 +117,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, 
to)).asInstanceOf[DataStream[Long]]
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
   }
 
   /**
@@ -147,7 +148,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
       "elements", typeInfo);
 
     javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
-      new SourceInvokable[T](new 
FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))),
 null, typeInfo,
+      new SourceInvokable[T](new 
FromElementsFunction[T](scala.collection.JavaConversions
+        .asJavaCollection(data))), null, typeInfo,
       "source", 1);
     new DataStream(returnStream)
   }
@@ -204,7 +206,8 @@ object StreamExecutionEnvironment {
    * of parallelism of the local environment is the number of hardware 
contexts (CPU cores/threads).
    */
   def createLocalEnvironment(
-    degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): 
StreamExecutionEnvironment = {
+    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
+  StreamExecutionEnvironment = {
     new 
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
   }
 
@@ -223,7 +226,8 @@ object StreamExecutionEnvironment {
    *                 those must be
    *                 provided in the JAR files.
    */
-  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): 
StreamExecutionEnvironment = {
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
+  StreamExecutionEnvironment = {
     new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, 
jarFiles: _*))
   }
 
@@ -251,4 +255,4 @@ object StreamExecutionEnvironment {
     javaEnv.setDegreeOfParallelism(degreeOfParallelism)
     new StreamExecutionEnvironment(javaEnv)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index fff5e86..7a39da5 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -33,7 +33,8 @@ import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) 
{
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
+TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
 
   override def createNextWindowOperator() = {
     new StreamJoinOperator.JoinWindow[I1, I2](this)
@@ -61,7 +62,8 @@ object StreamJoinOperator {
      * to define the second key.
      */
     def where(firstField: String, otherFields: String*) = {
-      new JoinPredicate[I1, I2](op, new 
PojoKeySelector[I1](op.input1.getType(), (firstField +: otherFields): _*))
+      new JoinPredicate[I1, I2](op, new 
PojoKeySelector[I1](op.input1.getType(),
+        (firstField +: otherFields): _*))
     }
 
     /**
@@ -82,7 +84,8 @@ object StreamJoinOperator {
 
   }
 
-  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, 
I2], private[flink] val keys1: KeySelector[I1, _]) {
+  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, 
I2],
+                              private[flink] val keys1: KeySelector[I1, _]) {
     private[flink] var keys2: KeySelector[I2, _] = null
 
     /**
@@ -145,30 +148,36 @@ object StreamJoinOperator {
         }
       }
 
-      return 
op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this,
 (_, _)),
+      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
         returnType, op.windowSize, op.slideInterval, op.timeStamp1, 
op.timeStamp2)
     }
   }
 
-  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: 
JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: 
JavaStream[(I1, I2)]) extends
+  DataStream[(I1, I2)](javaStream) {
 
     private val op = jp.op
 
     /**
-     * Sets a wrapper for the joined elements. For each joined pair, the 
result of the udf call will be emitted.
+     * Sets a wrapper for the joined elements. For each joined pair, the 
result of the
+     * udf call will be emitted.
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] 
= {
 
       val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getJoinWindowFunction(jp, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1, op.timeStamp2)
+        clean(getJoinWindowFunction(jp, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
 
-      
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
 invokable)
+      
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+        invokable)
 
       new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
     }
   }
 
-  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, 
I2], joinFunction: (I1, I2) => R) = {
+  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, 
I2],
+                                                      joinFunction: (I1, I2) 
=> R) = {
     Validate.notNull(joinFunction, "Join function must not be null.")
 
     val joinFun = new JoinFunction[I1, I2, R] {
@@ -183,4 +192,4 @@ object StreamJoinOperator {
     new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f215b50/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index c037305..8c763fc 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -52,7 +52,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * This controls how often the user defined function will be triggered on
    * the window.
    */
-  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new 
WindowedDataStream[T](javaStream.every(windowingHelper: _*))
+  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    new WindowedDataStream[T](javaStream.every(windowingHelper: _*))
 
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -126,12 +127,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
   /**
    * Applies a reduceGroup transformation on the windowed data stream by 
reducing
-   * the current window at every trigger. In contrast with the simple binary 
reduce operator, groupReduce exposes the whole window through the Iterable 
interface.
+   * the current window at every trigger. In contrast with the simple binary 
reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
    * </br>
    * </br>
    * Whenever possible try to use reduce instead of groupReduce for increased 
efficiency
    */
-  def reduceGroup[R: ClassTag: TypeInformation](reducer: 
GroupReduceFunction[T, R]): DataStream[R] = {
+  def reduceGroup[R: ClassTag: TypeInformation](reducer: 
GroupReduceFunction[T, R]):
+  DataStream[R] = {
     if (reducer == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
@@ -140,12 +143,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
   /**
    * Applies a reduceGroup transformation on the windowed data stream by 
reducing
-   * the current window at every trigger. In contrast with the simple binary 
reduce operator, groupReduce exposes the whole window through the Iterable 
interface.
+   * the current window at every trigger. In contrast with the simple binary 
reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
    * </br>
    * </br>
    * Whenever possible try to use reduce instead of groupReduce for increased 
efficiency
    */
-  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], 
Collector[R]) => Unit): DataStream[R] = {
+  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], 
Collector[R]) => Unit):
+  DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
@@ -181,16 +186,19 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the given position. When equality, returns the first.
    *
    */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType.MAXBY, position, first)
+  def maxBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType.MAXBY,
+    position, first)
 
   /**
    * Applies an aggregation that that gives the minimum element of the window 
by
    * the given position. When equality, returns the first.
    *
    */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType.MINBY, position, first)
+  def minBy(position: Int, first: Boolean = true): DataStream[T] = 
aggregate(AggregationType.MINBY,
+    position, first)
 
-  def aggregate(aggregationType: AggregationType, position: Int, first: 
Boolean = true): DataStream[T] = {
+  def aggregate(aggregationType: AggregationType, position: Int, first: 
Boolean = true):
+  DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
@@ -198,11 +206,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     val agg = new 
ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), 
position)
 
     val reducer = aggregationType match {
-      case AggregationType.SUM => new 
agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass()));
+      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
+        outType.getTypeAt(position).getTypeClass()));
       case _ => new agg.ProductComparableAggregator(aggregationType, first)
     }
 
     new 
DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
   }
 
-}
\ No newline at end of file
+}

Reply via email to