Where does "task_batches" come from? On 22 Jun 2015 4:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
> Thanks, > > I've updated my code to use updateStateByKey but am still getting these > errors when I resume from a checkpoint. > > One thought of mine was that I used sc.parallelize to generate the RDDs > for the queue, but perhaps on resume, it doesn't recreate the context > needed? > > > > -- > > Shaanan Cohney > PhD Student > University of Pennsylvania > > > shaan...@gmail.com > > On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet < > benjamin.fra...@gmail.com> wrote: > >> I would suggest you have a look at the updateStateByKey transformation in >> the Spark Streaming programming guide which should fit your needs better >> than your update_state function. >> On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: >> >>> Counts is a list (counts = []) in the driver, used to collect the >>> results. >>> It seems like it's also not the best way to be doing things, but I'm new >>> to spark and editing someone else's code so still learning. >>> Thanks! >>> >>> >>> def update_state(out_files, counts, curr_rdd): >>> try: >>> for c in curr_rdd.collect(): >>> fnames, count = c >>> counts.append(count) >>> out_files |= fnames >>> except Py4JJavaError as e: >>> print("EXCEPTION: %s" % str(e)) >>> >>> -- >>> >>> Shaanan Cohney >>> PhD Student >>> University of Pennsylvania >>> >>> >>> shaan...@gmail.com >>> >>> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet < >>> benjamin.fra...@gmail.com> wrote: >>> >>>> What does "counts" refer to? >>>> >>>> Could you also paste the code of your "update_state" function? >>>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: >>>> >>>>> I'm receiving the SPARK-5063 error (RDD transformations and actions >>>>> can only be invoked by the driver, not inside of other transformations) >>>>> whenever I try and restore from a checkpoint in spark streaming on my app. >>>>> >>>>> I'm using python3 and my RDDs are inside a queuestream DStream. >>>>> >>>>> This is the little chunk of code causing issues: >>>>> >>>>> ----- >>>>> >>>>> p_batches = [sc.parallelize(batch) for batch in task_batches] >>>>> >>>>> sieving_tasks = ssc.queueStream(p_batches) >>>>> sieving_tasks.checkpoint(20) >>>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, >>>>> poly_path, fb_paths)) >>>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) >>>>> ).foreachRDD(lambda s: update_state(out_files, counts, s)) >>>>> ssc.checkpoint(s3n_path) >>>>> >>>>> ----- >>>>> >>>>> Thanks again! >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Shaanan Cohney >>>>> PhD Student >>>>> University of Pennsylvania >>>>> >>>>> >>>>> shaan...@gmail.com >>>>> >>>> >>> >