Platon Potapov created SPARK-6232: ------------------------------------- Summary: Spark Streaming: simple application stalls processing Key: SPARK-6232 URL: https://issues.apache.org/jira/browse/SPARK-6232 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.2.1 Environment: Ubuntu, MacOS.
Reporter: Platon Potapov Priority: Critical Below is a snippet of a simple test application. Run it in one terminal window, and "nc -lk 9999" in another. Once per second, enter a number (so that the window would slide over several non-empty RDDs). 2-3 numbers is going to be enough for the program to stall with the following output: {code} ------------------------------------------- Time: 1425922369000 ms ------------------------------------------- ------------------------------------------- Time: 1425922370000 ms ------------------------------------------- (1.0,4.0) ------------------------------------------- Time: 1425922371000 ms ------------------------------------------- (1.0,4.0) [Stage 17:=============================> (1 + 0) / 2] {code} We've tried both standalone (local master) and clustered setups - reproduces in all cases. We tried raw sockets and Kafka as a receiver - reproduces in both cases. NOTE that the bug does not reproduce under the following conditions: * the receiver is from a queue (StreamingContext.queueStream) * in the commented-out "print" is un-commented. * if the window+reduce is substituted to reduceByKeyAndWindow here is the simple test application: {code} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming._ object SparkStreamingTest extends App { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingTest") val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines0 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines0.map(x => (1.0, x.toDouble)) // words.print() // TODO: enable this print to avoid the program freeze val windowed = words.window(Seconds(4), Seconds(1)) val grouped = windowed.reduceByKey(_ + _) grouped.print() ssc.start() ssc.awaitTermination() } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org