Hi all,
I've got a problem with Spark Streaming (both 1.3.1 and 1.5). Following
situation:
There is a component which gets a DStream of URLs. For each of these URLs,
it should access it, retrieve several data elements and pass those on for
further processing.

The relevant code looks like this:
...
   val urls: DStream[HttpRequest] = ...

    val documents = urls.flatMap { url =>
      val docs: Seq[(Label, Document)] = fetcher.retrieveContent(url)
      System.err.println("D1: " + docs.size + " " +
docs.map(_._2.source.timestamp))
      docs
    }

    documents.count().foreachRDD { rdd => System.err.println("D2: " +
rdd.collect().toList) }

    // write content to kafka
    documents.foreachRDD { rdd =>
      rdd.foreachPartition { rddPartition =>
        val docs = rddPartition.toList
        System.err.println("D3:" + docs.map {_._2.source.timestamp})
        val messages = docs.map { t => ("raw", t._1, t._2) }
        Kafka.getSink(zkConfig).accept(messages)
      }
    }
...

I see following output when I run this in Sparks local mode (cut irrelevant
parts, "timestamp" is a unique sequence number to track documents):
D2: List(0)
D3:List()
D1: 10 List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
D2: List(10)
D1: 10 List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
D3:List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
D1: 10 List(21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
D1: 10 List(31, 32, 33, 34, 35, 36, 37, 38, 39, 40)
D1: 10 List(41, 42, 43, 44, 45, 46, 47, 48, 49, 50)
D1: 10 List(51, 52, 53, 54, 55, 56, 57, 58, 59, 60)
D2: List(30)
D1: 10 List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70)
D1: 10 List(71, 72, 73, 74, 75, 76, 77, 78, 79, 80)
D1: 10 List(81, 82, 83, 84, 85, 86, 87, 88, 89, 90)
D3:List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77,
78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90)
D1: 10 List(91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
D1: 0 List()
D2: List(0)
D1: 0 List()
D3:List()
D1: 0 List()
D2: List(0)
D3:List()

When I look at the D1 lines (inside the flatMap function), I count 10
batches of 10 documents which is exactly as expected.
Then  I count the D1,2 lines though (after the flatMap function), there are
only 40 documents.

A document in my case is a key,value-tuple, the key objects in this case
being the same for all records.

Does anyone have an idea what might be happening to my other 60 documents?

Thank you so much in advance!

Regards,
Jeffrey

Reply via email to