Repository: flink Updated Branches: refs/heads/master bd7819a1e -> 58e1e4437
FLINK-1420 Small cleanup on code after branch for 0.8 release Small cleanup on code after branch for 0.8 release: -) Remove semicolons in Scala code for consistencies. -) Wrap some code in Java that is too long for easy read. -) Fix constant typo (from GENRAL_OPTIONS to GENERAL_OPTIONS) -) Remove some unused imports in Scala and Java code. Author: Henry Saputra <[email protected]> Closes #302 from hsaputra/cleanup_code_simple_1 and squashes the following commits: f98431e [Henry Saputra] Remove not needed semicolons from Scala code for consistency. 73fa587 [Henry Saputra] Move full class name to import for Serializable interface as import in InputSplitSource interface. a403136 [Henry Saputra] Move full package name to import statement for consistency. 387e0c8 [Henry Saputra] Remove unnecessary parentheses for consistency. 47b6b4c [Henry Saputra] Small cleanup on code after branch for 0.8 release: Remove semicolons in Scala code. Remove some unused imports in Scala and Java code. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58e1e443 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58e1e443 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58e1e443 Branch: refs/heads/master Commit: 58e1e4437ab308534b51f0de3a0d5ba1cbe3d1cc Parents: bd7819a Author: Henry Saputra <[email protected]> Authored: Mon Jan 19 12:22:25 2015 -0800 Committer: Henry Saputra <[email protected]> Committed: Mon Jan 19 12:22:25 2015 -0800 ---------------------------------------------------------------------- .../flink/streaming/api/scala/DataStream.scala | 2 +- .../api/scala/StreamCrossOperator.scala | 2 +- .../api/scala/StreamJoinOperator.scala | 4 +-- .../api/scala/WindowedDataStream.scala | 2 +- .../streaming/api/scala/windowing/Delta.scala | 2 +- .../streaming/api/scala/windowing/Time.scala | 3 +-- .../org/apache/flink/client/CliFrontend.java | 4 +-- .../org/apache/flink/client/program/Client.java | 6 ++--- .../org/apache/flink/compiler/PactCompiler.java | 1 - .../org/apache/flink/core/io/InputSplit.java | 4 ++- .../apache/flink/core/io/InputSplitSource.java | 4 ++- .../examples/scala/graph/DeltaPageRank.scala | 16 ++++++------ .../examples/scala/relational/TPCHQuery10.scala | 2 +- .../examples/scala/relational/TPCHQuery3.scala | 2 +- .../flink/api/java/operators/MapOperator.java | 5 ++-- .../apache/flink/runtime/client/JobClient.scala | 8 +++--- .../flink/runtime/jobmanager/JobManager.scala | 16 ++++++------ .../runtime/minicluster/FlinkMiniCluster.scala | 2 +- .../flink/runtime/taskmanager/TaskManager.scala | 26 ++++++++++---------- .../runtime/jobmanager/RecoveryITCase.scala | 12 ++++----- .../runtime/testingUtils/TestingUtils.scala | 4 +-- .../org/apache/flink/api/scala/DataSet.scala | 3 +-- .../apache/flink/api/scala/coGroupDataSet.scala | 4 +-- .../org/apache/flink/api/scala/package.scala | 2 +- .../scala/typeutils/CaseClassComparator.scala | 4 +-- .../scala/typeutils/CaseClassSerializer.scala | 3 +-- .../scala/typeutils/TraversableSerializer.scala | 1 - .../scala/functions/ClosureCleanerITCase.scala | 7 +----- .../misc/MassiveCaseClassSortingITCase.scala | 10 ++++---- .../api/scala/operators/GroupReduceITCase.scala | 2 +- .../scala/runtime/CaseClassComparatorTest.scala | 6 ++--- .../runtime/KryoGenericTypeSerializerTest.scala | 8 +++--- 32 files changed, 84 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index f0ec78f..53b75a0 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) { val reducer = aggregationType match { case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). - getTypeClass())); + getTypeClass())) case _ => new agg.ProductComparableAggregator(aggregationType, true) } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index e300610..a69454c 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -61,7 +61,7 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend val javaStream = input1.connect(input2).addGeneralWindowCombine( crossWindowFunction, returnType, windowSize, - slideInterval, timeStamp1, timeStamp2); + slideInterval, timeStamp1, timeStamp2) new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream) } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index 32765da..7ecd79a 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -49,7 +49,7 @@ object StreamJoinOperator { class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends TemporalWindow[JoinWindow[I1, I2]] { - private[flink] val type1 = op.input1.getType(); + private[flink] val type1 = op.input1.getType() /** * Continues a temporal Join transformation by defining @@ -102,7 +102,7 @@ object StreamJoinOperator { class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink] val keys1: KeySelector[I1, _]) { private[flink] var keys2: KeySelector[I2, _] = null - private[flink] val type2 = op.input2.getType(); + private[flink] val type2 = op.input2.getType() /** * Creates a temporal join transformation by defining the second join key. http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala index 1908939..5c734bf 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -238,7 +238,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { val reducer = aggregationType match { case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( - outType.getTypeAt(position).getTypeClass())); + outType.getTypeAt(position).getTypeClass())) case _ => new agg.ProductComparableAggregator(aggregationType, true) } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala index 83f2293..eedee0e 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala.windowing import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } import org.apache.commons.lang.Validate -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction object Delta { http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala index 3581730..9a69369 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala @@ -21,8 +21,7 @@ package org.apache.flink.streaming.api.scala.windowing import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } -import org.apache.commons.net.ntp.TimeStamp -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean; +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.windowing.helper.Timestamp import org.apache.commons.lang.Validate http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index e95f72f..8092513 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -104,7 +104,7 @@ public class CliFrontend { } // general options - private static final Options GENRAL_OPTIONS = createGeneralOptions(); + private static final Options GENERAL_OPTIONS = createGeneralOptions(); // action options all include the general options private static final Options RUN_OPTIONS = getRunOptions(createGeneralOptions()); @@ -874,7 +874,7 @@ public class CliFrontend { formatter.setWidth(80); formatter.setLeftPadding(5); formatter.setSyntaxPrefix(" general options:"); - formatter.printHelp(" ", GENRAL_OPTIONS); + formatter.printHelp(" ", GENERAL_OPTIONS); printHelpForRun(); printHelpForInfo(); http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index eab0899..b07444d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -301,8 +301,7 @@ public class Client { ActorRef client = pair._2(); - String hostname = configuration.getString(ConfigConstants - .JOB_MANAGER_IPC_ADDRESS_KEY, null); + String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); @@ -320,8 +319,7 @@ public class Client { try { if (wait) { - return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, - timeout); + return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout); } else { SubmissionResponse response =JobClient.submitJobDetached(jobGraph, client, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index 4411d3e..5126135 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -491,7 +491,6 @@ public class PactCompiler { throw new NullPointerException(); } - if (LOG.isDebugEnabled()) { LOG.debug("Beginning compilation of program '" + program.getJobName() + '\''); } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java index 0eab661..02d1744 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java @@ -18,10 +18,12 @@ package org.apache.flink.core.io; +import java.io.Serializable; + /** * This interface must be implemented by all kind of input splits that can be assigned to input formats. */ -public interface InputSplit extends IOReadableWritable, java.io.Serializable { +public interface InputSplit extends IOReadableWritable, Serializable { /** * Returns the number of this input split. http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java index 43065a8..3773c15 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java @@ -18,13 +18,15 @@ package org.apache.flink.core.io; +import java.io.Serializable; + /** * InputSplitSources create {@link InputSplit}s that define portions of data to be produced * by {@link org.apache.flink.api.common.io.InputFormat}s. * * @param <T> The type of the input splits created by the source. */ -public interface InputSplitSource<T extends InputSplit> extends java.io.Serializable { +public interface InputSplitSource<T extends InputSplit> extends Serializable { /** * Computes the input splits. The given minimum number of splits is a hint as to how http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala index c6eb643..d111890 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala @@ -34,7 +34,7 @@ object DeltaPageRank { def main(args: Array[String]) { - val maxIterations = 100; + val maxIterations = 100 val env = ExecutionEnvironment.getExecutionEnvironment @@ -64,10 +64,10 @@ object DeltaPageRank { } // random jump to self - out.collect((adj._1, RANDOM_JUMP)); + out.collect((adj._1, RANDOM_JUMP)) } } - .groupBy(0).sum(1); + .groupBy(0).sum(1) val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) } @@ -78,16 +78,16 @@ object DeltaPageRank { val deltas = workset.join(adjacency).where(0).equalTo(0) { (lastDeltas, adj, out: Collector[Page]) => { - val targets = adj._2; - val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length; + val targets = adj._2 + val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length for (target <- targets) { - out.collect((target, deltaPerTarget)); + out.collect((target, deltaPerTarget)) } } } .groupBy(0).sum(1) - .filter(x => Math.abs(x._2) > THRESHOLD); + .filter(x => Math.abs(x._2) > THRESHOLD) val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) { (current, delta) => (current._1, current._2 + delta._2) @@ -99,6 +99,6 @@ object DeltaPageRank { iteration.print() - env.execute("Page Rank"); + env.execute("Page Rank") } } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala index ec31461..b39a2dd 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala @@ -146,7 +146,7 @@ object TPCHQuery10 { " Due to legal restrictions, we can not ship generated data.\n" + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + - "<lineitem-csv path> <nation-csv path> <result path>"); + "<lineitem-csv path> <nation-csv path> <result path>") false } } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala index 44b002e..6b8b0fd 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala @@ -143,7 +143,7 @@ object TPCHQuery3 { " Due to legal restrictions, we can not ship generated data.\n" + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + - "<orders-csv path> <result path>"); + "<orders-csv path> <result path>") false } } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java index 9e96c64..7d2bbaa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java @@ -53,11 +53,12 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe } @Override - protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { + protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "Map at "+defaultName; // create operator - MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); + MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, + new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); // set input po.setInput(input); // set dop http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index e34c660..6a4beed 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -38,8 +38,8 @@ import scala.concurrent.{TimeoutException, Await} import scala.concurrent.duration.{FiniteDuration} -class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends Actor with ActorLogMessages -with ActorLogging{ +class JobClient(jobManagerURL: String, timeout: FiniteDuration) + extends Actor with ActorLogMessages with ActorLogging{ import context._ val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout) @@ -100,9 +100,9 @@ object JobClient{ case url: String => url case _ => val jobManagerAddress = configuration.getString(ConfigConstants - .JOB_MANAGER_IPC_ADDRESS_KEY, null); + .JOB_MANAGER_IPC_ADDRESS_KEY, null) val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) if (jobManagerAddress == null) { throw new RuntimeException("JobManager address has not been specified in the " + http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 16495ca..9dbd581 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmanager import java.io.File -import java.net.{InetSocketAddress} +import java.net.InetSocketAddress import java.util.concurrent.TimeUnit import akka.actor._ @@ -35,7 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.{JobException, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.instance.{InstanceManager} +import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} @@ -45,18 +45,18 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, He import org.apache.flink.runtime.profiling.ProfilingUtils import org.slf4j.LoggerFactory -import scala.concurrent.{Future} +import scala.concurrent.Future import scala.concurrent.duration._ -class JobManager(val configuration: Configuration) extends -Actor with ActorLogMessages with ActorLogging { +class JobManager(val configuration: Configuration) + extends Actor with ActorLogMessages with ActorLogging { import context._ import scala.collection.JavaConverters._ implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) - Execution.timeout = timeout; + Execution.timeout = timeout log.info(s"Starting job manager at ${self.path}.") @@ -115,7 +115,7 @@ Actor with ActorLogMessages with ActorLogging { hardwareInformation, numberOfSlots) // to be notified when the taskManager is no longer reachable - context.watch(taskManager); + context.watch(taskManager) taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort) } @@ -532,7 +532,7 @@ object JobManager { ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 val executionRetries = configuration.getInteger(ConfigConstants - .DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES); + .DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES) val delayBetweenRetries = 2 * configuration.getLong( ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 1b7d452..16884ca 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -123,7 +123,7 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, def stop(): Unit = { LOG.info("Stopping FlinkMiniCluster.") shutdown() - awaitTermination(); + awaitTermination() } def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 620a436..2872678 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -96,13 +96,13 @@ import scala.collection.JavaConverters._ TaskManager.checkTempDirs(tmpDirPaths) val ioManager = new IOManagerAsync(tmpDirPaths) val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize) - val bcVarManager = new BroadcastVariableManager(); + val bcVarManager = new BroadcastVariableManager() val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize) val fileCache = new FileCache() val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]() // Actors which want to be notified once this task manager has been registered at the job manager - val waitForRegistration = scala.collection.mutable.Set[ActorRef](); + val waitForRegistration = scala.collection.mutable.Set[ActorRef]() val profiler = profilingInterval match { case Some(interval) => Some(TaskManager.startProfiler(self.path.toSerializationFormat, @@ -116,7 +116,7 @@ import scala.collection.JavaConverters._ var registrationAttempts: Int = 0 var registered: Boolean = false var currentJobManager = ActorRef.noSender - var instanceID: InstanceID = null; + var instanceID: InstanceID = null var heartbeatScheduler: Option[Cancellable] = None if (log.isDebugEnabled) { @@ -187,7 +187,7 @@ import scala.collection.JavaConverters._ jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) } else { - log.error("TaskManager could not register at JobManager."); + log.error("TaskManager could not register at JobManager.") self ! PoisonPill } } @@ -321,7 +321,7 @@ import scala.collection.JavaConverters._ if (log.isDebugEnabled) { startRegisteringTask = System.currentTimeMillis() } - libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()); + libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()) if (log.isDebugEnabled) { log.debug(s"Register task ${executionID} took ${ @@ -436,11 +436,11 @@ import scala.collection.JavaConverters._ } sender ! TaskOperationResult(executionId, true) case None => sender ! TaskOperationResult(executionId, false, "No reader with ID " + - resultId + " was found."); + resultId + " was found.") } case None => sender ! TaskOperationResult(executionId, false, "No task with execution" + - "ID " + executionId + " was found."); + "ID " + executionId + " was found.") } } } @@ -598,14 +598,14 @@ object TaskManager { } val jobManagerHostname = configuration.getString(ConfigConstants - .JOB_MANAGER_IPC_ADDRESS_KEY, null); + .JOB_MANAGER_IPC_ADDRESS_KEY, null) val jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort); + val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort) val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0) - val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName; + val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName (hostname, port, configuration) } getOrElse { @@ -641,9 +641,9 @@ object TaskManager { case url: String => url case _ => val jobManagerAddress = configuration.getString(ConfigConstants - .JOB_MANAGER_IPC_ADDRESS_KEY, null); + .JOB_MANAGER_IPC_ADDRESS_KEY, null) val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) if (jobManagerAddress == null) { throw new RuntimeException("JobManager address has not been specified in the " + @@ -675,7 +675,7 @@ object TaskManager { val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig) - val networkBufferMem = if (localExecution) 0 else numNetworkBuffers * pageSize; + val networkBufferMem = if (localExecution) 0 else numNetworkBuffers * pageSize val configuredMemory: Long = configuration.getInteger(ConfigConstants .TASK_MANAGER_MEMORY_SIZE_KEY, -1) http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index c8088b6..6ed7620 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -45,8 +45,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll { "recover once failing forward job" in { FailingOnceReceiver.failed = false - val sender = new AbstractJobVertex("Sender"); - val receiver = new AbstractJobVertex("Receiver"); + val sender = new AbstractJobVertex("Sender") + val receiver = new AbstractJobVertex("Receiver") sender.setInvokableClass(classOf[Tasks.Sender]) receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver]) @@ -84,8 +84,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll { "recover once failing forward job with slot sharing" in { FailingOnceReceiver.failed = false - val sender = new AbstractJobVertex("Sender"); - val receiver = new AbstractJobVertex("Receiver"); + val sender = new AbstractJobVertex("Sender") + val receiver = new AbstractJobVertex("Receiver") sender.setInvokableClass(classOf[Tasks.Sender]) receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver]) @@ -127,8 +127,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll { "recover a task manager failure" in { BlockingOnceReceiver.blocking = true - val sender = new AbstractJobVertex("Sender"); - val receiver = new AbstractJobVertex("Receiver"); + val sender = new AbstractJobVertex("Sender") + val receiver = new AbstractJobVertex("Receiver") sender.setInvokableClass(classOf[Tasks.Sender]) receiver.setInvokableClass(classOf[Tasks.BlockingOnceReceiver]) http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 49bb4e0..37b99b1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -93,7 +93,7 @@ object TestingUtils { def startTestingTaskManagerWithConfiguration(hostname: String, config: Configuration) (implicit system: ActorSystem) = { val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(hostname, config); + TaskManager.parseConfiguration(hostname, config) system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) with TestingTaskManager)) @@ -118,7 +118,7 @@ object TestingUtils { } def setExecutionContext(context: ExecutionContext): Unit = { - AkkaUtils.globalExecutionContext = context; + AkkaUtils.globalExecutionContext = context } class QueuedActionExecutionContext(queue: ActionQueue) extends ExecutionContext { http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 0366125c..94c5461 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -33,7 +33,6 @@ import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator} import org.apache.flink.configuration.Configuration -import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.util.Collector @@ -1105,7 +1104,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def getKey(in: T) = cleanFun(in) } - val keyType = implicitly[TypeInformation[K]]; + val keyType = implicitly[TypeInformation[K]] val op = new PartitionOperator[T]( javaSet, http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala index 8fd4286..54374ba 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala @@ -256,9 +256,9 @@ class CoGroupDataSet[L, R]( new ImmutablePair[java.lang.Integer, Order](position, order)) case ( Right(expression), order ) => { - if (! (typeInfo.isInstanceOf[CompositeType[_]])) { + if (!typeInfo.isInstanceOf[CompositeType[_]]) { throw new InvalidProgramException("Specifying order keys via field positions is only " - + "valid for composite data types (pojo / tuple / case class)"); + + "valid for composite data types (pojo / tuple / case class)") } else { val ek = new ExpressionKeys[T](Array[String](expression), typeInfo) http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala index dbaf181..a0c0b58 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala @@ -64,7 +64,7 @@ package object scala { } } def getCallLocationName(depth: Int = 3) : String = { - val st = Thread.currentThread().getStackTrace(); + val st = Thread.currentThread().getStackTrace() if(st.length < depth) { return "<unknown>" } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala index dbd80ea..2e3d07a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala @@ -17,12 +17,10 @@ */ package org.apache.flink.api.scala.typeutils -import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator, -TypeSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase import org.apache.flink.core.memory.MemorySegment import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException} -; /** * Comparator for Case Classes. Access is different from http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index e2c361f..eb2441d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -20,7 +20,6 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.core.memory.{DataOutputView, DataInputView} -; /** * Serializer for Case Classes. Creation and access is different from @@ -84,7 +83,7 @@ abstract class CaseClassSerializer[T <: Product]( } def deserialize(reuse: T, source: DataInputView): T = { - deserialize(source); + deserialize(source) } def deserialize(source: DataInputView): T = { http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala index 5468637..fa519d9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.core.memory.{DataOutputView, DataInputView} import scala.collection.generic.CanBuildFrom -; /** * Serializer for Scala Collections. http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala index ba66695..41c96b0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala @@ -18,19 +18,14 @@ package org.apache.flink.api.scala.functions import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase import org.junit.Assert.fail import org.junit.{After, Before, Test, Rule} import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.runners.Parameterized.Parameters - -import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.flink.api.scala._ import org.apache.flink.api.common.InvalidProgramException http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index 7c6bcaf..6e5296b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -32,7 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.runtime.operators.sort.UnilateralSortMerger import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory -import org.junit.Assert._; +import org.junit.Assert._ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable class MassiveCaseClassSortingITCase { @@ -79,7 +79,7 @@ class MassiveCaseClassSortingITCase { val inputIterator = new StringTupleReader(reader) val typeInfo = implicitly[TypeInformation[StringTuple]] - .asInstanceOf[CompositeType[StringTuple]]; + .asInstanceOf[CompositeType[StringTuple]] val serializer = typeInfo.createSerializer() val comparator = typeInfo.createComparator(Array(0, 1), Array(true, true), 0) @@ -99,7 +99,7 @@ class MassiveCaseClassSortingITCase { val verifyIterator = new StringTupleReader(verifyReader) var num = 0 - var hasMore = true; + var hasMore = true while (hasMore) { val next = verifyIterator.next(null) @@ -127,7 +127,7 @@ class MassiveCaseClassSortingITCase { } assertNull(sortedData.next(null)) - assertEquals(NUM_STRINGS, num); + assertEquals(NUM_STRINGS, num) } finally { if (reader != null) { @@ -200,7 +200,7 @@ class MassiveCaseClassSortingITCase { object MassiveCaseClassSortingITCase { def main(args: Array[String]) { - new MassiveCaseClassSortingITCase().testStringTuplesSorting; + new MassiveCaseClassSortingITCase().testStringTuplesSorting() } } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala index 1d806c2..ce3bba3 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala @@ -712,7 +712,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo "2,5,0,Hallo Welt-Hallo Welt wie,1\n" + "3,15,0,BCD-ABC-Hallo Welt wie gehts?,2\n" + "4,34,0,FGH-CDE-EFG-DEF,1\n" + - "5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n"; + "5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n" } http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala index d150e85..4718395 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala @@ -48,7 +48,7 @@ class CaseClassComparatorTest { val typeInfo = implicitly[TypeInformation[CaseTestClass]] .asInstanceOf[CompositeType[CaseTestClass]] - val serializer = typeInfo.createSerializer(); + val serializer = typeInfo.createSerializer() val comparator = new FailingCompareDeserializedWrapper( typeInfo.createComparator(Array[Int](0, 2), Array[Boolean](true, true), 0)) @@ -98,7 +98,7 @@ class CaseClassComparatorTest { catch { case e: Exception => { e.printStackTrace() - fail(e.getMessage()); + fail(e.getMessage()) } } } @@ -120,7 +120,7 @@ class CaseClassComparatorTest { def compare(first: T, second: T): Int = wrapped.compare(first, second) def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = { - throw new UnsupportedOperationException("Not Supported"); + throw new UnsupportedOperationException("Not Supported") } def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey() http://git-wip-us.apache.org/repos/asf/flink/blob/58e1e443/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index 37e334b..d61edff 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -152,12 +152,12 @@ class KryoGenericTypeSerializerTest { class LocalDateSerializer extends Serializer[LocalDate] with java.io.Serializable { override def write(kryo: Kryo, output: Output, obj: LocalDate) { - output.writeInt(obj.getYear()); - output.writeInt(obj.getMonthOfYear()); - output.writeInt(obj.getDayOfMonth()); + output.writeInt(obj.getYear()) + output.writeInt(obj.getMonthOfYear()) + output.writeInt(obj.getDayOfMonth()) } override def read(kryo: Kryo, input: Input, typeClass: Class[LocalDate]) : LocalDate = { - new LocalDate(input.readInt(), input.readInt(), input.readInt()); + new LocalDate(input.readInt(), input.readInt(), input.readInt()) } }
