Re: Is the operation inside foreachRDD supposed to be blocking?
Yes, this is a stupid example. In my real code the processItem method is using some third-party library which does things asynchronously and returns a Future. On Fri, Jul 8, 2016 at 3:11 PM, Sean Owenwrote: > You can write this code. I don't think it will do anything useful because > you're executing asynchronously but then just blocking waiting for > completion. It seems the same as just doing all the work in processItems() > directly. > > On Fri, Jul 8, 2016 at 1:56 PM, Mikael Ståldal > wrote: > >> I am not sure I fully understand your answer. >> >> Is this code correct? >> >> def main() { >> KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, >> DefaultDecoder](streamingContext, configs, topics).foreachRDD { rdd => >> Await.ready(processItems(rdd.collect()), Duration.Inf) >> } >> } >> >> def processItems(items: Array[(String, Array[Byte])]): Future[Unit] = { >> // start some work which may take a while and immediately return a Future >> to keep track of the work >> } >> >> >> On Fri, Jul 8, 2016 at 12:56 PM, Sean Owen wrote: >> >>> It's no different than any other operation on an RDD. A transformation >>> doesn't actually do anything by itself, so does not block. An action >>> triggers computation and blocks until the action completes. You can wait >>> for it with a Future, sure. >>> >>> On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal < >>> mikael.stal...@magine.com> wrote: >>> In a Spark Streaming job, is the operation inside foreachRDD supposed to synchronous / blocking? What if you do some asynchronous operation which returns a Future? Are you then supposed to do Await on that Future? -- [image: MagineTV] *Mikael Ståldal* Senior software developer *Magine TV* mikael.stal...@magine.com Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com Privileged and/or Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such a person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. >>> >>> >> >> >> -- >> [image: MagineTV] >> >> *Mikael Ståldal* >> Senior software developer >> >> *Magine TV* >> mikael.stal...@magine.com >> Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com >> >> Privileged and/or Confidential Information may be contained in this >> message. If you are not the addressee indicated in this message >> (or responsible for delivery of the message to such a person), you may >> not copy or deliver this message to anyone. In such case, >> you should destroy this message and kindly notify the sender by reply >> email. >> > > -- [image: MagineTV] *Mikael Ståldal* Senior software developer *Magine TV* mikael.stal...@magine.com Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com Privileged and/or Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such a person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email.
Re: Is the operation inside foreachRDD supposed to be blocking?
You can write this code. I don't think it will do anything useful because you're executing asynchronously but then just blocking waiting for completion. It seems the same as just doing all the work in processItems() directly. On Fri, Jul 8, 2016 at 1:56 PM, Mikael Ståldalwrote: > I am not sure I fully understand your answer. > > Is this code correct? > > def main() { > KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, > DefaultDecoder](streamingContext, configs, topics).foreachRDD { rdd => > Await.ready(processItems(rdd.collect()), Duration.Inf) > } > } > > def processItems(items: Array[(String, Array[Byte])]): Future[Unit] = { > // start some work which may take a while and immediately return a Future > to keep track of the work > } > > > On Fri, Jul 8, 2016 at 12:56 PM, Sean Owen wrote: > >> It's no different than any other operation on an RDD. A transformation >> doesn't actually do anything by itself, so does not block. An action >> triggers computation and blocks until the action completes. You can wait >> for it with a Future, sure. >> >> On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal < >> mikael.stal...@magine.com> wrote: >> >>> In a Spark Streaming job, is the operation inside foreachRDD supposed to >>> synchronous / blocking? >>> >>> What if you do some asynchronous operation which returns a Future? Are >>> you then supposed to do Await on that Future? >>> -- >>> [image: MagineTV] >>> >>> *Mikael Ståldal* >>> Senior software developer >>> >>> *Magine TV* >>> mikael.stal...@magine.com >>> Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com >>> >>> Privileged and/or Confidential Information may be contained in this >>> message. If you are not the addressee indicated in this message >>> (or responsible for delivery of the message to such a person), you may >>> not copy or deliver this message to anyone. In such case, >>> you should destroy this message and kindly notify the sender by reply >>> email. >>> >> >> > > > -- > [image: MagineTV] > > *Mikael Ståldal* > Senior software developer > > *Magine TV* > mikael.stal...@magine.com > Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com > > Privileged and/or Confidential Information may be contained in this > message. If you are not the addressee indicated in this message > (or responsible for delivery of the message to such a person), you may not > copy or deliver this message to anyone. In such case, > you should destroy this message and kindly notify the sender by reply > email. >
Re: Is the operation inside foreachRDD supposed to be blocking?
I am not sure I fully understand your answer. Is this code correct? def main() { KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](streamingContext, configs, topics).foreachRDD { rdd => Await.ready(processItems(rdd.collect()), Duration.Inf) } } def processItems(items: Array[(String, Array[Byte])]): Future[Unit] = { // start some work which may take a while and immediately return a Future to keep track of the work } On Fri, Jul 8, 2016 at 12:56 PM, Sean Owenwrote: > It's no different than any other operation on an RDD. A transformation > doesn't actually do anything by itself, so does not block. An action > triggers computation and blocks until the action completes. You can wait > for it with a Future, sure. > > On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal > wrote: > >> In a Spark Streaming job, is the operation inside foreachRDD supposed to >> synchronous / blocking? >> >> What if you do some asynchronous operation which returns a Future? Are >> you then supposed to do Await on that Future? >> -- >> [image: MagineTV] >> >> *Mikael Ståldal* >> Senior software developer >> >> *Magine TV* >> mikael.stal...@magine.com >> Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com >> >> Privileged and/or Confidential Information may be contained in this >> message. If you are not the addressee indicated in this message >> (or responsible for delivery of the message to such a person), you may >> not copy or deliver this message to anyone. In such case, >> you should destroy this message and kindly notify the sender by reply >> email. >> > > -- [image: MagineTV] *Mikael Ståldal* Senior software developer *Magine TV* mikael.stal...@magine.com Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com Privileged and/or Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such a person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email.
Re: Is the operation inside foreachRDD supposed to be blocking?
It's no different than any other operation on an RDD. A transformation doesn't actually do anything by itself, so does not block. An action triggers computation and blocks until the action completes. You can wait for it with a Future, sure. On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldalwrote: > In a Spark Streaming job, is the operation inside foreachRDD supposed to > synchronous / blocking? > > What if you do some asynchronous operation which returns a Future? Are you > then supposed to do Await on that Future? > -- > [image: MagineTV] > > *Mikael Ståldal* > Senior software developer > > *Magine TV* > mikael.stal...@magine.com > Grev Turegatan 3 | 114 46 Stockholm, Sweden | www.magine.com > > Privileged and/or Confidential Information may be contained in this > message. If you are not the addressee indicated in this message > (or responsible for delivery of the message to such a person), you may not > copy or deliver this message to anyone. In such case, > you should destroy this message and kindly notify the sender by reply > email. >