Hi all, I've got a problem with Spark Streaming (both 1.3.1 and 1.5). Following situation: There is a component which gets a DStream of URLs. For each of these URLs, it should access it, retrieve several data elements and pass those on for further processing.
The relevant code looks like this: ... val urls: DStream[HttpRequest] = ... val documents = urls.flatMap { url => val docs: Seq[(Label, Document)] = fetcher.retrieveContent(url) System.err.println("D1: " + docs.size + " " + docs.map(_._2.source.timestamp)) docs } documents.count().foreachRDD { rdd => System.err.println("D2: " + rdd.collect().toList) } // write content to kafka documents.foreachRDD { rdd => rdd.foreachPartition { rddPartition => val docs = rddPartition.toList System.err.println("D3:" + docs.map {_._2.source.timestamp}) val messages = docs.map { t => ("raw", t._1, t._2) } Kafka.getSink(zkConfig).accept(messages) } } ... I see following output when I run this in Sparks local mode (cut irrelevant parts, "timestamp" is a unique sequence number to track documents): D2: List(0) D3:List() D1: 10 List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) D2: List(10) D1: 10 List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20) D3:List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20) D1: 10 List(21, 22, 23, 24, 25, 26, 27, 28, 29, 30) D1: 10 List(31, 32, 33, 34, 35, 36, 37, 38, 39, 40) D1: 10 List(41, 42, 43, 44, 45, 46, 47, 48, 49, 50) D1: 10 List(51, 52, 53, 54, 55, 56, 57, 58, 59, 60) D2: List(30) D1: 10 List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70) D1: 10 List(71, 72, 73, 74, 75, 76, 77, 78, 79, 80) D1: 10 List(81, 82, 83, 84, 85, 86, 87, 88, 89, 90) D3:List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90) D1: 10 List(91, 92, 93, 94, 95, 96, 97, 98, 99, 100) D1: 0 List() D2: List(0) D1: 0 List() D3:List() D1: 0 List() D2: List(0) D3:List() When I look at the D1 lines (inside the flatMap function), I count 10 batches of 10 documents which is exactly as expected. Then I count the D1,2 lines though (after the flatMap function), there are only 40 documents. A document in my case is a key,value-tuple, the key objects in this case being the same for all records. Does anyone have an idea what might be happening to my other 60 documents? Thank you so much in advance! Regards, Jeffrey