Platon Potapov created SPARK-6232:
-------------------------------------

             Summary: Spark Streaming: simple application stalls processing
                 Key: SPARK-6232
                 URL: https://issues.apache.org/jira/browse/SPARK-6232
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.2.1
         Environment: Ubuntu, MacOS.

            Reporter: Platon Potapov
            Priority: Critical


Below is a snippet of a simple test application.
Run it in one terminal window, and "nc -lk 9999" in another.

Once per second, enter a number (so that the window would slide over several 
non-empty RDDs). 2-3 numbers is going to be enough for the program to stall 
with the following output:

{code}
-------------------------------------------
Time: 1425922369000 ms
-------------------------------------------

-------------------------------------------
Time: 1425922370000 ms
-------------------------------------------
(1.0,4.0)

-------------------------------------------
Time: 1425922371000 ms
-------------------------------------------
(1.0,4.0)

[Stage 17:=============================>                            (1 + 0) / 2]
{code}

We've tried both standalone (local master) and clustered setups - reproduces in 
all cases. We tried raw sockets and Kafka as a receiver - reproduces in both 
cases.

NOTE that the bug does not reproduce under the following conditions:
* the receiver is from a queue (StreamingContext.queueStream)
* in the commented-out "print" is un-commented.
* if the window+reduce is substituted to reduceByKeyAndWindow

here is the simple test application:

{code}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._

object SparkStreamingTest extends App {

  val sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkStreamingTest")
  val ssc = new StreamingContext(sparkConf, Seconds(1))

  val lines0 = ssc.socketTextStream("localhost", 9999, 
StorageLevel.MEMORY_AND_DISK_SER)
  val words = lines0.map(x => (1.0, x.toDouble))
  // words.print() // TODO: enable this print to avoid the program freeze

  val windowed = words.window(Seconds(4), Seconds(1))
  val grouped = windowed.reduceByKey(_ + _)
  grouped.print()

  ssc.start()
  ssc.awaitTermination()
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to