Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I
can get the topics from the offset!!

On Fri, Jul 31, 2015 at 4:41 PM, Brandon White <bwwintheho...@gmail.com>
wrote:

> Tathagata,
>
> Could the bottleneck possibility be the number of executor nodes in our
> cluster? Since we are creating 500 Dstreams based off 500 textfile
> directories, do we need at least 500 executors / nodes to be receivers for
> each one of the streams?
>
> On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> @Ashwin: You could append the topic in the data.
>>
>> val kafkaStreams = topics.map { topic =>
>>     KafkaUtils.createDirectStream(....topic...).map { x => (x, topic) }
>> }
>> val unionedStream = context.union(kafkaStreams)
>>
>>
>> @Brandon:
>> I dont recommend it, but you could do something crazy like use the
>> foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
>> waits for all the jobs to complete.
>>
>> manyDStreams.foreach { dstream =>
>>    dstream1.foreachRDD { rdd =>
>>     // Add runnable that runs the job on RDD to threadpool
>>     // This does not wait for the job to finish
>>   }
>> }
>>
>> anyOfTheManyDStreams.foreachRDD { _ =>
>>     // wait for all the current batch's jobs in the threadpool to
>> complete.
>>
>> }
>>
>>
>> This would run all the Spark jobs in the batch in parallel in thread
>> pool, but it would also make sure all the jobs finish before the batch is
>> marked as completed.
>>
>> On Tue, Jul 28, 2015 at 4:05 PM, Brandon White <bwwintheho...@gmail.com>
>> wrote:
>>
>>> Thank you Tathagata. My main use case for the 500 streams is to append
>>> new elements into their corresponding Spark SQL tables. Every stream is
>>> mapped to a table so I'd like to use the streams to appended the new rdds
>>> to the table. If I union all the streams, appending new elements becomes a
>>> nightmare. So there is no other way to parallelize something like the
>>> following? Will this still run sequence or timeout?
>>>
>>> //500 streams
>>> streams.foreach { stream =>
>>>   stream.foreachRDD { rdd =>
>>>     val df = sqlContext.jsonRDD(rdd)
>>>     df.saveAsTable(streamTuple._1, SaveMode.Append)
>>>
>>>   }
>>> }
>>>
>>> On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> I dont think any one has really run 500 text streams.
>>>> And parSequences do nothing out there, you are only parallelizing the
>>>> setup code which does not really compute anything. Also it setsup 500
>>>> foreachRDD operations that will get executed in each batch sequentially, so
>>>> does not make sense. The write way to parallelize this is union all the
>>>> streams.
>>>>
>>>> val streams = streamPaths.map { path =>
>>>>   ssc.textFileStream(path)
>>>> }
>>>> val unionedStream = streamingContext.union(streams)
>>>> unionedStream.foreachRDD { rdd =>
>>>>   // do something
>>>> }
>>>>
>>>> Then there is only one foreachRDD executed in every batch that will
>>>> process in parallel all the new files in each batch interval.
>>>> TD
>>>>
>>>>
>>>> On Tue, Jul 28, 2015 at 3:06 PM, Brandon White <bwwintheho...@gmail.com
>>>> > wrote:
>>>>
>>>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>>>
>>>>> //500 textFile streams watching S3 directories
>>>>> val streams = streamPaths.par.map { path =>
>>>>>   ssc.textFileStream(path)
>>>>> }
>>>>>
>>>>> streams.par.foreach { stream =>
>>>>>   stream.foreachRDD { rdd =>
>>>>>     //do something
>>>>>   }
>>>>> }
>>>>>
>>>>> ssc.start()
>>>>>
>>>>> Would something like this scale? What would be the limiting factor to
>>>>> performance? What is the best way to parallelize this? Any other ideas on
>>>>> design?
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Thanks & Regards,
Ashwin Giridharan

Reply via email to