Repository: flink
Updated Branches:
  refs/heads/master 063b1092b -> 361947d6c


[FLINK-2619] [tests] Fix for some unexecuted Scala tests

This closes #1103


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9edd9a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9edd9a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9edd9a8

Branch: refs/heads/master
Commit: c9edd9a88ab20bc314470fb3eb7da4c643dd7d57
Parents: 063b109
Author: Chiwan Park <chiwanp...@apache.org>
Authored: Mon Sep 7 23:03:56 2015 +0900
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 18:03:13 2015 +0200

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.scala             |   5 +-
 .../TaskManagerLossFailsTasksTest.scala         |   5 +-
 .../jobmanager/JobManagerRegistrationTest.scala |   4 +-
 .../test/operations/GraphOperationsITCase.scala |   1 +
 .../flink/api/scala/ScalaShellITSuite.scala     |  85 +++----
 .../manual/MassiveCaseClassSortingITCase.scala  | 243 +++++++++++++++++++
 .../misc/MassiveCaseClassSortingITCase.scala    | 240 ------------------
 7 files changed, 298 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 6060bc3..434a8cb 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -26,8 +26,11 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, 
JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
+@RunWith(classOf[JUnitRunner])
 class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
 
   val NUM_TASKS = 31
@@ -118,7 +121,7 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
         }
 
         eg.getState should equal(JobStatus.FINISHED)
-      }catch{
+      } catch {
         case t: Throwable =>
           t.printStackTrace()
           fail(t.getMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 3d0d3a5..177dc85 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -26,8 +26,11 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, 
JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
+@RunWith(classOf[JUnitRunner])
 class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
   "A task manager loss" must {
@@ -64,7 +67,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with 
Matchers {
 
         instance1.markDead()
         eg.getState should equal(JobStatus.FAILING)
-      }catch{
+      } catch {
         case t:Throwable =>
           t.printStackTrace()
           fail(t.getMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4368309..7487670 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -27,10 +27,11 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo, InstanceID}
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
 import org.junit.Assert.{assertNotEquals, assertNotNull}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 import scala.concurrent.duration._
@@ -41,6 +42,7 @@ import scala.language.postfixOps
  * It also tests the JobManager's response to heartbeats from TaskManagers it 
does
  * not know.
  */
+@RunWith(classOf[JUnitRunner])
 class JobManagerRegistrationTest(_system: ActorSystem) extends 
TestKit(_system) with
 ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
index d49e565..713eb8d 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.graph.scala.test.operations
 
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.api.scala._

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 9717ae7..e932cd2 100644
--- 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -22,17 +22,20 @@ import java.io._
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, 
ForkableFlinkMiniCluster, FlinkTestBase}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
+import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, 
TestEnvironment}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
 
+@RunWith(classOf[JUnitRunner])
 class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
   test("Iteration test with iterative Pi example") {
 
-    val input : String =
+    val input: String =
       """
         val initial = env.fromElements(0)
 
@@ -46,9 +49,9 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
         }
         val result = count map { c => c / 10000.0 * 4 }
         result.collect()
-    """.stripMargin
+      """.stripMargin
 
-    val output : String = processInShell(input)
+    val output: String = processInShell(input)
 
     output should not include "failed"
     output should not include "error"
@@ -56,7 +59,8 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
   }
 
   test("WordCount in Shell") {
-    val input = """
+    val input =
+      """
         val text = env.fromElements("To be, or not to be,--that is the 
question:--",
         "Whether 'tis nobler in the mind to suffer",
         "The slings and arrows of outrageous fortune",
@@ -64,7 +68,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
 
         val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) 
}.groupBy(0).sum(1)
         val result = counts.print()
-    """.stripMargin
+      """.stripMargin
 
     val output = processInShell(input)
 
@@ -72,7 +76,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
     output should not include "error"
     output should not include "Exception"
 
-//    some of the words that should be included
+    // some of the words that should be included
     output should include("(a,1)")
     output should include("(whether,1)")
     output should include("(to,4)")
@@ -80,14 +84,14 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
   }
 
   test("Sum 1..10, should be 55") {
-    val input : String =
+    val input =
       """
         val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
         val reduced = input.reduce(_+_)
         reduced.print
       """.stripMargin
 
-    val output : String = processInShell(input)
+    val output = processInShell(input)
 
     output should not include "failed"
     output should not include "error"
@@ -97,7 +101,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
   }
 
   test("WordCount in Shell with custom case class") {
-    val input : String =
+    val input =
       """
       case class WC(word: String, count: Int)
 
@@ -111,7 +115,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
       reduced.print()
       """.stripMargin
 
-    val output : String = processInShell(input)
+    val output = processInShell(input)
 
     output should not include "failed"
     output should not include "error"
@@ -120,11 +124,9 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
     output should include("WC(hello,1)")
     output should include("WC(world,10)")
   }
-  
-  
+
   test("Submit external library") {
-    
-    val input : String =
+    val input =
       """
         import org.apache.flink.ml.math._
         val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
@@ -132,12 +134,14 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
       """.stripMargin
 
     // find jar file that contains the ml code
-    var externalJar : String = ""
-    var folder : File = new File("../flink-ml/target/");
-    var listOfFiles : Array[File] = folder.listFiles();
-    for(i <- 0 to listOfFiles.length - 1){
-      var filename : String = listOfFiles(i).getName();
-      if(!filename.contains("test") && !filename.contains("original") && 
filename.contains(".jar")){
+    var externalJar = ""
+    val folder = new File("../flink-ml/target/")
+    val listOfFiles = folder.listFiles()
+
+    for (i <- listOfFiles.indices) {
+      val filename: String = listOfFiles(i).getName
+      if (!filename.contains("test") && !filename.contains("original") && 
filename.contains(
+        ".jar")) {
         println("ive found file:" + listOfFiles(i).getAbsolutePath)
         externalJar = listOfFiles(i).getAbsolutePath
       }
@@ -145,13 +149,13 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 
     assert(externalJar != "")
 
-    val output : String = processInShell(input,Option(externalJar))
+    val output: String = processInShell(input, Option(externalJar))
 
     output should not include "failed"
     output should not include "error"
     output should not include "Exception"
 
-    output should include( "\nDenseVector(1.0, 2.0, 3.0)")
+    output should include("\nDenseVector(1.0, 2.0, 3.0)")
   }
 
   /**
@@ -159,8 +163,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
    * @param input commands to be processed in the shell
    * @return output of shell
    */
-  def processInShell(input : String, externalJars : Option[String] = None): 
String ={
-    
+  def processInShell(input: String, externalJars: Option[String] = None): 
String = {
     val in = new BufferedReader(new StringReader(input + "\n"))
     val out = new StringWriter()
     val baos = new ByteArrayOutputStream()
@@ -174,28 +177,26 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
       case Some(c) => c.getLeaderRPCPort
       case _ => throw new RuntimeException("Test cluster not initialized.")
     }
-    
-    var repl : FlinkILoop= null 
-    
-    externalJars match {
-      case Some(ej) => repl = new FlinkILoop(
-        host, port,  
-        Option(Array(ej)), 
+
+    val repl = externalJars match {
+      case Some(ej) => new FlinkILoop(
+        host, port,
+        Option(Array(ej)),
+        in, new PrintWriter(out))
+
+      case None => new FlinkILoop(
+        host, port,
         in, new PrintWriter(out))
-        
-      case None => repl = new FlinkILoop(
-        host,port,
-        in,new PrintWriter(out))
     }
-    
+
     repl.settings = new Settings()
 
     // enable this line to use scala in intellij
     repl.settings.usejavacp.value = true
-    
+
     externalJars match {
       case Some(ej) => repl.settings.classpath.value = ej
-      case None => 
+      case None =>
     }
 
     repl.process(repl.settings)
@@ -205,7 +206,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
     System.setOut(oldOut)
 
     baos.flush()
-    
+
     val stdout = baos.toString
 
     out.toString + stdout
@@ -230,6 +231,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
   }
 
   override def afterAll(): Unit = {
-    cluster.map(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, 
TimeUnit.SECONDS)))
+    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, 
TimeUnit.SECONDS)))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
new file mode 100644
index 0000000..7385fa2
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.manual
+
+import java.io.File
+import java.util.Random
+import java.io.BufferedWriter
+import java.io.FileWriter
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.scala._
+import java.io.BufferedReader
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import java.io.FileReader
+import org.apache.flink.util.MutableObjectIterator
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
+import org.junit.Assert._
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
+
+/**
+ * This test is wrote as manual test.
+ */
+class MassiveCaseClassSortingITCase {
+  
+  val SEED : Long = 347569784659278346L
+  
+  def testStringTuplesSorting() {
+    
+    val NUM_STRINGS = 3000000
+    var input: File = null
+    var sorted: File = null
+    
+    try {
+      input = generateFileWithStringTuples(NUM_STRINGS,
+                                           
"http://some-uri.com/that/is/a/common/prefix/to/all";)
+        
+      sorted = File.createTempFile("sorted_strings", "txt")
+      
+      val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
+                        + input.getAbsolutePath + "\" | sort > \"" + 
sorted.getAbsolutePath + "\"")
+
+      var p: Process = null
+      try {
+        p = Runtime.getRuntime.exec(command)
+        val retCode = p.waitFor()
+        if (retCode != 0) {
+          throw new Exception("Command failed with return code " + retCode)
+        }
+        p = null
+      }
+      finally {
+        if (p != null) {
+          p.destroy()
+        }
+      }
+      
+      var sorter: UnilateralSortMerger[StringTuple] = null
+      
+      var reader: BufferedReader = null
+      var verifyReader: BufferedReader = null
+      
+      try {
+        reader = new BufferedReader(new FileReader(input))
+        val inputIterator = new StringTupleReader(reader)
+        
+        val typeInfo = implicitly[TypeInformation[StringTuple]]
+          .asInstanceOf[CompositeType[StringTuple]]
+        
+        val serializer = typeInfo.createSerializer(new ExecutionConfig)
+        val comparator = typeInfo.createComparator(
+          Array(0, 1),
+          Array(true, true),
+          0,
+          new ExecutionConfig)
+        
+        val mm = new MemoryManager(1024 * 1024, 1)
+        val ioMan = new IOManagerAsync()
+        
+        sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, 
inputIterator,
+              new DummyInvokable(), 
+              new RuntimeSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
+              comparator, 1.0, 4, 0.8f)
+            
+        val sortedData = sorter.getIterator
+        reader.close()
+        
+        verifyReader = new BufferedReader(new FileReader(sorted))
+        val verifyIterator = new StringTupleReader(verifyReader)
+        
+        var num = 0
+        var hasMore = true
+        
+        while (hasMore) {
+          val next = verifyIterator.next(null)
+          
+          if (next != null ) {
+            num += 1
+            
+            val nextFromFlinkSort = sortedData.next(null)
+            
+            assertNotNull(nextFromFlinkSort)
+            
+            assertEquals(next.key1, nextFromFlinkSort.key1)
+            assertEquals(next.key2, nextFromFlinkSort.key2)
+            
+            // assert array equals does not work here
+            assertEquals(next.value.length, nextFromFlinkSort.value.length)
+            for (i <- 0 until next.value.length) {
+              assertEquals(next.value(i), nextFromFlinkSort.value(i))
+            }
+            
+          }
+          else {
+            hasMore = false
+          }
+        }
+        
+        assertNull(sortedData.next(null))
+        assertEquals(NUM_STRINGS, num)
+      }
+      finally {
+        if (reader != null) {
+          reader.close()
+        }
+        if (verifyReader != null) {
+          verifyReader.close()
+        }
+        if (sorter != null) {
+          sorter.close()
+        }
+      }
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        e.getMessage
+      }
+    }
+    finally {
+      if (input != null) {
+        input.delete()
+      }
+      if (sorted != null) {
+        sorted.delete()
+      }
+    }
+  }
+  
+  
+  private def generateFileWithStringTuples(numStrings: Int, prefix: String): 
File = {
+    val rnd = new Random(SEED)
+    val bld = new StringBuilder()
+    val f = File.createTempFile("strings", "txt")
+    
+    var wrt: BufferedWriter = null
+    
+    try {
+      wrt = new BufferedWriter(new FileWriter(f))
+
+      for (i <- 0 until numStrings) {
+        bld.setLength(0)
+        val numComps = rnd.nextInt(5) + 2
+        
+        for (z <- 0 until numComps) {
+          if (z > 0) {
+            bld.append(' ')
+          }
+          bld.append(prefix)
+          val len = rnd.nextInt(20) + 10
+          
+          for (k <- 0 until len) {
+            val c = (rnd.nextInt(80) + 40).toChar
+            bld.append(c)
+          }
+        }
+        val str = bld.toString
+        wrt.write(str)
+        wrt.newLine()
+      }
+    }
+    finally {
+      wrt.close()
+    }
+    f
+  }
+}
+
+object MassiveCaseClassSortingITCase {
+  
+  def main(args: Array[String]) {
+    new MassiveCaseClassSortingITCase().testStringTuplesSorting()
+  }
+}
+
+case class StringTuple(key1: String, key2: String, value: Array[String])
+  
+class StringTupleReader(val reader: BufferedReader) extends 
MutableObjectIterator[StringTuple] {
+  
+  override def next(reuse: StringTuple): StringTuple = {
+    val line = reader.readLine()
+    if (line == null) {
+      return null
+    }
+    val parts = line.split(" ")
+    StringTuple(parts(0), parts(1), parts)
+  }
+
+  override def next(): StringTuple = {
+    val line = reader.readLine()
+    if (line == null) {
+      return null
+    }
+    val parts = line.split(" ")
+    StringTuple(parts(0), parts(1), parts)
+  }
+
+}
+
+class DummyInvokable extends AbstractInvokable {
+
+  override def registerInputOutput() = {}
+  override def invoke() = {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
deleted file mode 100644
index dd27eb5..0000000
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.misc
-
-import java.io.File
-import java.util.Random
-import java.io.BufferedWriter
-import java.io.FileWriter
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.scala._
-import java.io.BufferedReader
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
-import java.io.FileReader
-import org.apache.flink.util.MutableObjectIterator
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
-import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
-import org.junit.Assert._
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
-
-class MassiveCaseClassSortingITCase {
-  
-  val SEED : Long = 347569784659278346L
-  
-  def testStringTuplesSorting() {
-    
-    val NUM_STRINGS = 3000000
-    var input: File = null
-    var sorted: File = null
-    
-    try {
-      input = generateFileWithStringTuples(NUM_STRINGS,
-                                           
"http://some-uri.com/that/is/a/common/prefix/to/all";)
-        
-      sorted = File.createTempFile("sorted_strings", "txt")
-      
-      val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
-                        + input.getAbsolutePath + "\" | sort > \"" + 
sorted.getAbsolutePath + "\"")
-
-      var p: Process = null
-      try {
-        p = Runtime.getRuntime.exec(command)
-        val retCode = p.waitFor()
-        if (retCode != 0) {
-          throw new Exception("Command failed with return code " + retCode)
-        }
-        p = null
-      }
-      finally {
-        if (p != null) {
-          p.destroy()
-        }
-      }
-      
-      var sorter: UnilateralSortMerger[StringTuple] = null
-      
-      var reader: BufferedReader = null
-      var verifyReader: BufferedReader = null
-      
-      try {
-        reader = new BufferedReader(new FileReader(input))
-        val inputIterator = new StringTupleReader(reader)
-        
-        val typeInfo = implicitly[TypeInformation[StringTuple]]
-          .asInstanceOf[CompositeType[StringTuple]]
-        
-        val serializer = typeInfo.createSerializer(new ExecutionConfig)
-        val comparator = typeInfo.createComparator(
-          Array(0, 1),
-          Array(true, true),
-          0,
-          new ExecutionConfig)
-        
-        val mm = new MemoryManager(1024 * 1024, 1)
-        val ioMan = new IOManagerAsync()
-        
-        sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, 
inputIterator,
-              new DummyInvokable(), 
-              new RuntimeSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
-              comparator, 1.0, 4, 0.8f)
-            
-        val sortedData = sorter.getIterator
-        reader.close()
-        
-        verifyReader = new BufferedReader(new FileReader(sorted))
-        val verifyIterator = new StringTupleReader(verifyReader)
-        
-        var num = 0
-        var hasMore = true
-        
-        while (hasMore) {
-          val next = verifyIterator.next(null)
-          
-          if (next != null ) {
-            num += 1
-            
-            val nextFromFlinkSort = sortedData.next(null)
-            
-            assertNotNull(nextFromFlinkSort)
-            
-            assertEquals(next.key1, nextFromFlinkSort.key1)
-            assertEquals(next.key2, nextFromFlinkSort.key2)
-            
-            // assert array equals does not work here
-            assertEquals(next.value.length, nextFromFlinkSort.value.length)
-            for (i <- 0 until next.value.length) {
-              assertEquals(next.value(i), nextFromFlinkSort.value(i))
-            }
-            
-          }
-          else {
-            hasMore = false
-          }
-        }
-        
-        assertNull(sortedData.next(null))
-        assertEquals(NUM_STRINGS, num)
-      }
-      finally {
-        if (reader != null) {
-          reader.close()
-        }
-        if (verifyReader != null) {
-          verifyReader.close()
-        }
-        if (sorter != null) {
-          sorter.close()
-        }
-      }
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        e.getMessage
-      }
-    }
-    finally {
-      if (input != null) {
-        input.delete()
-      }
-      if (sorted != null) {
-        sorted.delete()
-      }
-    }
-  }
-  
-  
-  private def generateFileWithStringTuples(numStrings: Int, prefix: String): 
File = {
-    val rnd = new Random(SEED)
-    val bld = new StringBuilder()
-    val f = File.createTempFile("strings", "txt")
-    
-    var wrt: BufferedWriter = null
-    
-    try {
-      wrt = new BufferedWriter(new FileWriter(f))
-
-      for (i <- 0 until numStrings) {
-        bld.setLength(0)
-        val numComps = rnd.nextInt(5) + 2
-        
-        for (z <- 0 until numComps) {
-          if (z > 0) {
-            bld.append(' ')
-          }
-          bld.append(prefix)
-          val len = rnd.nextInt(20) + 10
-          
-          for (k <- 0 until len) {
-            val c = (rnd.nextInt(80) + 40).toChar
-            bld.append(c)
-          }
-        }
-        val str = bld.toString
-        wrt.write(str)
-        wrt.newLine()
-      }
-    }
-    finally {
-      wrt.close()
-    }
-    f
-  }
-}
-
-object MassiveCaseClassSortingITCase {
-  
-  def main(args: Array[String]) {
-    new MassiveCaseClassSortingITCase().testStringTuplesSorting()
-  }
-}
-
-case class StringTuple(key1: String, key2: String, value: Array[String])
-  
-class StringTupleReader(val reader: BufferedReader) extends 
MutableObjectIterator[StringTuple] {
-  
-  override def next(reuse: StringTuple): StringTuple = {
-    val line = reader.readLine()
-    if (line == null) {
-      return null
-    }
-    val parts = line.split(" ")
-    StringTuple(parts(0), parts(1), parts)
-  }
-
-  override def next(): StringTuple = {
-    val line = reader.readLine()
-    if (line == null) {
-      return null
-    }
-    val parts = line.split(" ")
-    StringTuple(parts(0), parts(1), parts)
-  }
-
-}
-
-class DummyInvokable extends AbstractInvokable {
-
-  override def registerInputOutput() = {}
-  override def invoke() = {}
-}

Reply via email to