Repository: flink
Updated Branches:
  refs/heads/master bd7819a1e -> 58e1e4437


FLINK-1420 Small cleanup on code after branch for 0.8 release

Small cleanup on code after branch for 0.8 release:
-) Remove semicolons in Scala code for consistencies.
-) Wrap some code in Java that is too long for easy read.
-) Fix constant typo (from GENRAL_OPTIONS to GENERAL_OPTIONS)
-) Remove some unused imports in Scala and Java code.

Author: Henry Saputra <[email protected]>

Closes #302 from hsaputra/cleanup_code_simple_1 and squashes the following 
commits:

f98431e [Henry Saputra] Remove not needed semicolons from Scala code for 
consistency.
73fa587 [Henry Saputra] Move full class name to import for Serializable 
interface as import in InputSplitSource interface.
a403136 [Henry Saputra] Move full package name to import statement for 
consistency.
387e0c8 [Henry Saputra] Remove unnecessary parentheses for consistency.
47b6b4c [Henry Saputra] Small cleanup on code after branch for 0.8 release: 
Remove semicolons in Scala code. Remove some unused imports in Scala and Java 
code.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58e1e443
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58e1e443
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58e1e443

Branch: refs/heads/master
Commit: 58e1e4437ab308534b51f0de3a0d5ba1cbe3d1cc
Parents: bd7819a
Author: Henry Saputra <[email protected]>
Authored: Mon Jan 19 12:22:25 2015 -0800
Committer: Henry Saputra <[email protected]>
Committed: Mon Jan 19 12:22:25 2015 -0800

----------------------------------------------------------------------
 .../flink/streaming/api/scala/DataStream.scala  |  2 +-
 .../api/scala/StreamCrossOperator.scala         |  2 +-
 .../api/scala/StreamJoinOperator.scala          |  4 +--
 .../api/scala/WindowedDataStream.scala          |  2 +-
 .../streaming/api/scala/windowing/Delta.scala   |  2 +-
 .../streaming/api/scala/windowing/Time.scala    |  3 +--
 .../org/apache/flink/client/CliFrontend.java    |  4 +--
 .../org/apache/flink/client/program/Client.java |  6 ++---
 .../org/apache/flink/compiler/PactCompiler.java |  1 -
 .../org/apache/flink/core/io/InputSplit.java    |  4 ++-
 .../apache/flink/core/io/InputSplitSource.java  |  4 ++-
 .../examples/scala/graph/DeltaPageRank.scala    | 16 ++++++------
 .../examples/scala/relational/TPCHQuery10.scala |  2 +-
 .../examples/scala/relational/TPCHQuery3.scala  |  2 +-
 .../flink/api/java/operators/MapOperator.java   |  5 ++--
 .../apache/flink/runtime/client/JobClient.scala |  8 +++---
 .../flink/runtime/jobmanager/JobManager.scala   | 16 ++++++------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  2 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 26 ++++++++++----------
 .../runtime/jobmanager/RecoveryITCase.scala     | 12 ++++-----
 .../runtime/testingUtils/TestingUtils.scala     |  4 +--
 .../org/apache/flink/api/scala/DataSet.scala    |  3 +--
 .../apache/flink/api/scala/coGroupDataSet.scala |  4 +--
 .../org/apache/flink/api/scala/package.scala    |  2 +-
 .../scala/typeutils/CaseClassComparator.scala   |  4 +--
 .../scala/typeutils/CaseClassSerializer.scala   |  3 +--
 .../scala/typeutils/TraversableSerializer.scala |  1 -
 .../scala/functions/ClosureCleanerITCase.scala  |  7 +-----
 .../misc/MassiveCaseClassSortingITCase.scala    | 10 ++++----
 .../api/scala/operators/GroupReduceITCase.scala |  2 +-
 .../scala/runtime/CaseClassComparatorTest.scala |  6 ++---
 .../runtime/KryoGenericTypeSerializerTest.scala |  8 +++---
 32 files changed, 84 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index f0ec78f..53b75a0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
     val reducer = aggregationType match {
       case AggregationType.SUM => new 
agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
-        getTypeClass()));
+        getTypeClass()))
       case _ => new agg.ProductComparableAggregator(aggregationType, true)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index e300610..a69454c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -61,7 +61,7 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: 
JavaStream[I2]) extend
     val javaStream = input1.connect(input2).addGeneralWindowCombine(
       crossWindowFunction,
       returnType, windowSize,
-      slideInterval, timeStamp1, timeStamp2);
+      slideInterval, timeStamp1, timeStamp2)
 
     new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 32765da..7ecd79a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -49,7 +49,7 @@ object StreamJoinOperator {
   class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) 
extends 
   TemporalWindow[JoinWindow[I1, I2]] {
 
-    private[flink] val type1 = op.input1.getType();
+    private[flink] val type1 = op.input1.getType()
 
     /**
      * Continues a temporal Join transformation by defining
@@ -102,7 +102,7 @@ object StreamJoinOperator {
   class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, 
I2],
     private[flink] val keys1: KeySelector[I1, _]) {
     private[flink] var keys2: KeySelector[I2, _] = null
-    private[flink] val type2 = op.input2.getType();
+    private[flink] val type2 = op.input2.getType()
 
     /**
      * Creates a temporal join transformation by defining the second join key.

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 1908939..5c734bf 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -238,7 +238,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
     val reducer = aggregationType match {
       case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
-        outType.getTypeAt(position).getTypeClass()));
+        outType.getTypeAt(position).getTypeClass()))
       case _ => new agg.ProductComparableAggregator(aggregationType, true)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
index 83f2293..eedee0e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala.windowing
 
 import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
 import org.apache.commons.lang.Validate
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
 
 object Delta {

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
index 3581730..9a69369 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -21,8 +21,7 @@ package org.apache.flink.streaming.api.scala.windowing
 import java.util.concurrent.TimeUnit
 import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
 
-import org.apache.commons.net.ntp.TimeStamp
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.Timestamp
 import org.apache.commons.lang.Validate
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e95f72f..8092513 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -104,7 +104,7 @@ public class CliFrontend {
        }
        
        // general options
-       private static final Options GENRAL_OPTIONS = createGeneralOptions();
+       private static final Options GENERAL_OPTIONS = createGeneralOptions();
        
        // action options all include the general options
        private static final Options RUN_OPTIONS = 
getRunOptions(createGeneralOptions());
@@ -874,7 +874,7 @@ public class CliFrontend {
                formatter.setWidth(80);
                formatter.setLeftPadding(5);
                formatter.setSyntaxPrefix("  general options:");
-               formatter.printHelp(" ", GENRAL_OPTIONS);
+               formatter.printHelp(" ", GENERAL_OPTIONS);
                
                printHelpForRun();
                printHelpForInfo();

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index eab0899..b07444d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -301,8 +301,7 @@ public class Client {
 
                ActorRef client = pair._2();
 
-               String hostname = configuration.getString(ConfigConstants
-                               .JOB_MANAGER_IPC_ADDRESS_KEY, null);
+               String hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 
                FiniteDuration timeout = new 
FiniteDuration(configuration.getInteger(ConfigConstants
                                .AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
@@ -320,8 +319,7 @@ public class Client {
                try {
 
                        if (wait) {
-                               return JobClient.submitJobAndWait(jobGraph, 
printStatusDuringExecution, client,
-                                               timeout);
+                               return JobClient.submitJobAndWait(jobGraph, 
printStatusDuringExecution, client, timeout);
                        }
                        else {
                                SubmissionResponse response 
=JobClient.submitJobDetached(jobGraph, client, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 4411d3e..5126135 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -491,7 +491,6 @@ public class PactCompiler {
                        throw new NullPointerException();
                }
                
-               
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Beginning compilation of program '" + 
program.getJobName() + '\'');
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java 
b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
index 0eab661..02d1744 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.core.io;
 
+import java.io.Serializable;
+
 /**
  * This interface must be implemented by all kind of input splits that can be 
assigned to input formats.
  */
-public interface InputSplit extends IOReadableWritable, java.io.Serializable {
+public interface InputSplit extends IOReadableWritable, Serializable {
        
        /**
         * Returns the number of this input split.

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java 
b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
index 43065a8..3773c15 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.core.io;
 
+import java.io.Serializable;
+
 /**
  * InputSplitSources create {@link InputSplit}s that define portions of data 
to be produced
  * by {@link org.apache.flink.api.common.io.InputFormat}s.
  *
  * @param <T> The type of the input splits created by the source.
  */
-public interface InputSplitSource<T extends InputSplit> extends 
java.io.Serializable {
+public interface InputSplitSource<T extends InputSplit> extends Serializable {
 
        /**
         * Computes the input splits. The given minimum number of splits is a 
hint as to how

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index c6eb643..d111890 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -34,7 +34,7 @@ object DeltaPageRank {
 
   def main(args: Array[String]) {
 
-    val maxIterations = 100;
+    val maxIterations = 100
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -64,10 +64,10 @@ object DeltaPageRank {
           }
 
           // random jump to self
-          out.collect((adj._1, RANDOM_JUMP));
+          out.collect((adj._1, RANDOM_JUMP))
         }
     }
-      .groupBy(0).sum(1);
+      .groupBy(0).sum(1)
 
     val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - 
INITIAL_RANK) }
 
@@ -78,16 +78,16 @@ object DeltaPageRank {
           val deltas = workset.join(adjacency).where(0).equalTo(0) {
             (lastDeltas, adj, out: Collector[Page]) =>
               {
-                val targets = adj._2;
-                val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / 
targets.length;
+                val targets = adj._2
+                val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / 
targets.length
 
                 for (target <- targets) {
-                  out.collect((target, deltaPerTarget));
+                  out.collect((target, deltaPerTarget))
                 }
               }
           }
             .groupBy(0).sum(1)
-            .filter(x => Math.abs(x._2) > THRESHOLD);
+            .filter(x => Math.abs(x._2) > THRESHOLD)
 
           val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) {
             (current, delta) => (current._1, current._2 + delta._2)
@@ -99,6 +99,6 @@ object DeltaPageRank {
 
     iteration.print()
 
-    env.execute("Page Rank");
+    env.execute("Page Rank")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
index ec31461..b39a2dd 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
@@ -146,7 +146,7 @@ object TPCHQuery10 {
           "  Due to legal restrictions, we can not ship generated data.\n" +
           "  You can find the TPC-H data generator at 
http://www.tpc.org/tpch/.\n"; +
           "  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + 
-                                "<lineitem-csv path> <nation-csv path> <result 
path>");
+                                "<lineitem-csv path> <nation-csv path> <result 
path>")
       false
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
index 44b002e..6b8b0fd 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -143,7 +143,7 @@ object TPCHQuery3 {
           " Due to legal restrictions, we can not ship generated data.\n" +
           " You can find the TPC-H data generator at 
http://www.tpc.org/tpch/.\n"; +
           " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
-                             "<orders-csv path> <result path>");
+                             "<orders-csv path> <result path>")
       false
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 9e96c64..7d2bbaa 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -53,11 +53,12 @@ public class MapOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT, MapOpe
        }
        
        @Override
-       protected 
org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, 
MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+       protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> 
translateToDataFlow(Operator<IN> input) {
                
                String name = getName() != null ? getName() : "Map at 
"+defaultName;
                // create operator
-               MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new 
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+               MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new 
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function,
+                               new UnaryOperatorInformation<IN, 
OUT>(getInputType(), getResultType()), name);
                // set input
                po.setInput(input);
                // set dop

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index e34c660..6a4beed 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -38,8 +38,8 @@ import scala.concurrent.{TimeoutException, Await}
 import scala.concurrent.duration.{FiniteDuration}
 
 
-class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor 
with ActorLogMessages
-with  ActorLogging{
+class JobClient(jobManagerURL: String, timeout: FiniteDuration)
+  extends Actor with ActorLogMessages with  ActorLogging{
   import context._
 
   val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
@@ -100,9 +100,9 @@ object JobClient{
       case url: String => url
       case _ =>
         val jobManagerAddress = configuration.getString(ConfigConstants
-          .JOB_MANAGER_IPC_ADDRESS_KEY, null);
+          .JOB_MANAGER_IPC_ADDRESS_KEY, null)
         val jobManagerRPCPort = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
         if (jobManagerAddress == null) {
           throw new RuntimeException("JobManager address has not been 
specified in the " +

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 16495ca..9dbd581 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.File
-import java.net.{InetSocketAddress}
+import java.net.InetSocketAddress
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.{JobException, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.{InstanceManager}
+import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
@@ -45,18 +45,18 @@ import 
org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, He
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.slf4j.LoggerFactory
 
-import scala.concurrent.{Future}
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
-class JobManager(val configuration: Configuration) extends
-Actor with ActorLogMessages with ActorLogging {
+class JobManager(val configuration: Configuration)
+  extends Actor with ActorLogMessages with ActorLogging {
   import context._
   import scala.collection.JavaConverters._
 
   implicit val timeout = 
FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
     ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
 
-  Execution.timeout = timeout;
+  Execution.timeout = timeout
 
   log.info(s"Starting job manager at ${self.path}.")
 
@@ -115,7 +115,7 @@ Actor with ActorLogMessages with ActorLogging {
         hardwareInformation, numberOfSlots)
 
       // to be notified when the taskManager is no longer reachable
-      context.watch(taskManager);
+      context.watch(taskManager)
 
       taskManager ! AcknowledgeRegistration(instanceID, 
libraryCacheManager.getBlobServerPort)
     }
@@ -532,7 +532,7 @@ object JobManager {
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
     val executionRetries = configuration.getInteger(ConfigConstants
-      .DEFAULT_EXECUTION_RETRIES_KEY, 
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+      .DEFAULT_EXECUTION_RETRIES_KEY, 
ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
     val delayBetweenRetries = 2 * configuration.getLong(
       ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 1b7d452..16884ca 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -123,7 +123,7 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration,
   def stop(): Unit = {
     LOG.info("Stopping FlinkMiniCluster.")
     shutdown()
-    awaitTermination();
+    awaitTermination()
   }
 
   def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 620a436..2872678 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -96,13 +96,13 @@ import scala.collection.JavaConverters._
   TaskManager.checkTempDirs(tmpDirPaths)
   val ioManager = new IOManagerAsync(tmpDirPaths)
   val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, 
pageSize)
-  val bcVarManager = new BroadcastVariableManager();
+  val bcVarManager = new BroadcastVariableManager()
   val hardwareDescription = 
HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
   val fileCache = new FileCache()
   val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, 
Task]()
 
   // Actors which want to be notified once this task manager has been 
registered at the job manager
-  val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
+  val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
 
   val profiler = profilingInterval match {
     case Some(interval) => 
Some(TaskManager.startProfiler(self.path.toSerializationFormat,
@@ -116,7 +116,7 @@ import scala.collection.JavaConverters._
   var registrationAttempts: Int = 0
   var registered: Boolean = false
   var currentJobManager = ActorRef.noSender
-  var instanceID: InstanceID = null;
+  var instanceID: InstanceID = null
   var heartbeatScheduler: Option[Cancellable] = None
 
   if (log.isDebugEnabled) {
@@ -187,7 +187,7 @@ import scala.collection.JavaConverters._
         jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, 
numberOfSlots)
       }
       else {
-        log.error("TaskManager could not register at JobManager.");
+        log.error("TaskManager could not register at JobManager.")
         self ! PoisonPill
       }
     }
@@ -321,7 +321,7 @@ import scala.collection.JavaConverters._
       if (log.isDebugEnabled) {
         startRegisteringTask = System.currentTimeMillis()
       }
-      libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles());
+      libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles())
 
       if (log.isDebugEnabled) {
         log.debug(s"Register task ${executionID} took ${
@@ -436,11 +436,11 @@ import scala.collection.JavaConverters._
                 }
                 sender ! TaskOperationResult(executionId, true)
               case None => sender ! TaskOperationResult(executionId, false, 
"No reader with ID " +
-                resultId + " was found.");
+                resultId + " was found.")
             }
 
           case None => sender ! TaskOperationResult(executionId, false, "No 
task with execution" +
-            "ID " + executionId + " was found.");
+            "ID " + executionId + " was found.")
         }
     }
   }
@@ -598,14 +598,14 @@ object TaskManager {
         }
 
         val jobManagerHostname = configuration.getString(ConfigConstants
-          .JOB_MANAGER_IPC_ADDRESS_KEY, null);
+          .JOB_MANAGER_IPC_ADDRESS_KEY, null)
         val jobManagerPort = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        val jobManagerAddress = new InetSocketAddress(jobManagerHostname, 
jobManagerPort);
+        val jobManagerAddress = new InetSocketAddress(jobManagerHostname, 
jobManagerPort)
 
         val port = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
-        val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName;
+        val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName
 
         (hostname, port, configuration)
     } getOrElse {
@@ -641,9 +641,9 @@ object TaskManager {
       case url: String => url
       case _ =>
         val jobManagerAddress = configuration.getString(ConfigConstants
-          .JOB_MANAGER_IPC_ADDRESS_KEY, null);
+          .JOB_MANAGER_IPC_ADDRESS_KEY, null)
         val jobManagerRPCPort = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
         if (jobManagerAddress == null) {
           throw new RuntimeException("JobManager address has not been 
specified in the " +
@@ -675,7 +675,7 @@ object TaskManager {
 
     val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, 
pageSize, nettyConfig)
 
-    val networkBufferMem = if (localExecution) 0 else numNetworkBuffers * 
pageSize;
+    val networkBufferMem = if (localExecution) 0 else numNetworkBuffers * 
pageSize
 
     val configuredMemory: Long = configuration.getInteger(ConfigConstants
       .TASK_MANAGER_MEMORY_SIZE_KEY, -1)

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index c8088b6..6ed7620 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -45,8 +45,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "recover once failing forward job" in {
       FailingOnceReceiver.failed = false
 
-      val sender = new AbstractJobVertex("Sender");
-      val receiver = new AbstractJobVertex("Receiver");
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Tasks.Sender])
       receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver])
@@ -84,8 +84,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "recover once failing forward job with slot sharing" in {
       FailingOnceReceiver.failed = false
 
-      val sender = new AbstractJobVertex("Sender");
-      val receiver = new AbstractJobVertex("Receiver");
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Tasks.Sender])
       receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver])
@@ -127,8 +127,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "recover a task manager failure" in {
       BlockingOnceReceiver.blocking = true
 
-      val sender = new AbstractJobVertex("Sender");
-      val receiver = new AbstractJobVertex("Receiver");
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Tasks.Sender])
       receiver.setInvokableClass(classOf[Tasks.BlockingOnceReceiver])

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 49bb4e0..37b99b1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -93,7 +93,7 @@ object TestingUtils {
   def startTestingTaskManagerWithConfiguration(hostname: String, config: 
Configuration)
                                               (implicit system: ActorSystem) = 
{
     val (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfig) =
-      TaskManager.parseConfiguration(hostname, config);
+      TaskManager.parseConfiguration(hostname, config)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager))
@@ -118,7 +118,7 @@ object TestingUtils {
   }
 
   def setExecutionContext(context: ExecutionContext): Unit = {
-    AkkaUtils.globalExecutionContext = context;
+    AkkaUtils.globalExecutionContext = context
   }
 
   class QueuedActionExecutionContext(queue: ActionQueue) extends 
ExecutionContext {

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 0366125c..94c5461 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -33,7 +33,6 @@ import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, 
ScalaAggregateOperator}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.Collector
@@ -1105,7 +1104,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       def getKey(in: T) = cleanFun(in)
     }
     
-    val keyType = implicitly[TypeInformation[K]];
+    val keyType = implicitly[TypeInformation[K]]
 
     val op = new PartitionOperator[T](
       javaSet,

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
index 8fd4286..54374ba 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
@@ -256,9 +256,9 @@ class CoGroupDataSet[L, R](
                                       new ImmutablePair[java.lang.Integer, 
Order](position, order))
         
         case ( Right(expression), order ) => {
-          if (! (typeInfo.isInstanceOf[CompositeType[_]])) {
+          if (!typeInfo.isInstanceOf[CompositeType[_]]) {
             throw new InvalidProgramException("Specifying order keys via field 
positions is only "
-                                   + "valid for composite data types (pojo / 
tuple / case class)");
+                                   + "valid for composite data types (pojo / 
tuple / case class)")
           }
           else {
             val ek = new ExpressionKeys[T](Array[String](expression), typeInfo)

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index dbaf181..a0c0b58 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -64,7 +64,7 @@ package object scala {
     }
   }
   def getCallLocationName(depth: Int = 3) : String = {
-    val st = Thread.currentThread().getStackTrace();
+    val st = Thread.currentThread().getStackTrace()
     if(st.length < depth) {
       return "<unknown>"
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala
index dbd80ea..2e3d07a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala
@@ -17,12 +17,10 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, 
TypeComparator,
-TypeSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
 import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
 import org.apache.flink.core.memory.MemorySegment
 import org.apache.flink.types.{KeyFieldOutOfBoundsException, 
NullKeyFieldException}
-;
 
 /**
  * Comparator for Case Classes. Access is different from

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index e2c361f..eb2441d 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -20,7 +20,6 @@ package org.apache.flink.api.scala.typeutils
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
-;
 
 /**
  * Serializer for Case Classes. Creation and access is different from
@@ -84,7 +83,7 @@ abstract class CaseClassSerializer[T <: Product](
   }
 
   def deserialize(reuse: T, source: DataInputView): T = {
-    deserialize(source);
+    deserialize(source)
   }
 
   def deserialize(source: DataInputView): T = {

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 5468637..fa519d9 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 
 import scala.collection.generic.CanBuildFrom
-;
 
 /**
  * Serializer for Scala Collections.

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index ba66695..41c96b0 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -18,19 +18,14 @@
 package org.apache.flink.api.scala.functions
 
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, 
JavaProgramTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.junit.Assert.fail
 import org.junit.{After, Before, Test, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.InvalidProgramException

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index 7c6bcaf..6e5296b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
 import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory
-import org.junit.Assert._;
+import org.junit.Assert._
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 
 class MassiveCaseClassSortingITCase {
@@ -79,7 +79,7 @@ class MassiveCaseClassSortingITCase {
         val inputIterator = new StringTupleReader(reader)
         
         val typeInfo = implicitly[TypeInformation[StringTuple]]
-                                             
.asInstanceOf[CompositeType[StringTuple]];
+          .asInstanceOf[CompositeType[StringTuple]]
         
         val serializer = typeInfo.createSerializer()
         val comparator = typeInfo.createComparator(Array(0, 1), Array(true, 
true), 0)
@@ -99,7 +99,7 @@ class MassiveCaseClassSortingITCase {
         val verifyIterator = new StringTupleReader(verifyReader)
         
         var num = 0
-        var hasMore = true;
+        var hasMore = true
         
         while (hasMore) {
           val next = verifyIterator.next(null)
@@ -127,7 +127,7 @@ class MassiveCaseClassSortingITCase {
         }
         
         assertNull(sortedData.next(null))
-        assertEquals(NUM_STRINGS, num);
+        assertEquals(NUM_STRINGS, num)
       }
       finally {
         if (reader != null) {
@@ -200,7 +200,7 @@ class MassiveCaseClassSortingITCase {
 object MassiveCaseClassSortingITCase {
   
   def main(args: Array[String]) {
-    new MassiveCaseClassSortingITCase().testStringTuplesSorting;
+    new MassiveCaseClassSortingITCase().testStringTuplesSorting()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 1d806c2..ce3bba3 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -712,7 +712,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
       "2,5,0,Hallo Welt-Hallo Welt wie,1\n" +
       "3,15,0,BCD-ABC-Hallo Welt wie gehts?,2\n" +
       "4,34,0,FGH-CDE-EFG-DEF,1\n" +
-      "5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n";
+      "5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n"
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
index d150e85..4718395 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -48,7 +48,7 @@ class CaseClassComparatorTest {
       val typeInfo = implicitly[TypeInformation[CaseTestClass]]
                                      
.asInstanceOf[CompositeType[CaseTestClass]]
       
-      val serializer = typeInfo.createSerializer();
+      val serializer = typeInfo.createSerializer()
       val comparator = new FailingCompareDeserializedWrapper(
           typeInfo.createComparator(Array[Int](0, 2), Array[Boolean](true, 
true), 0))
       
@@ -98,7 +98,7 @@ class CaseClassComparatorTest {
     catch {
       case e: Exception => {
         e.printStackTrace()
-        fail(e.getMessage());
+        fail(e.getMessage())
       }
     }
   }
@@ -120,7 +120,7 @@ class CaseClassComparatorTest {
     def compare(first: T, second: T): Int = wrapped.compare(first, second)
     
     def compareSerialized(firstSource: DataInputView, secondSource: 
DataInputView): Int = {
-      throw new UnsupportedOperationException("Not Supported");
+      throw new UnsupportedOperationException("Not Supported")
     }
     
     def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey()

http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index 37e334b..d61edff 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -152,12 +152,12 @@ class KryoGenericTypeSerializerTest {
 class LocalDateSerializer extends Serializer[LocalDate] with 
java.io.Serializable {
 
   override def write(kryo: Kryo, output: Output, obj: LocalDate) {
-    output.writeInt(obj.getYear());
-    output.writeInt(obj.getMonthOfYear());
-    output.writeInt(obj.getDayOfMonth());
+    output.writeInt(obj.getYear())
+    output.writeInt(obj.getMonthOfYear())
+    output.writeInt(obj.getDayOfMonth())
   }
 
   override def read(kryo: Kryo, input: Input, typeClass: Class[LocalDate]) : 
LocalDate = {
-    new LocalDate(input.readInt(), input.readInt(), input.readInt());
+    new LocalDate(input.readInt(), input.readInt(), input.readInt())
   }
 }

Reply via email to