Where does "task_batches" come from?
On 22 Jun 2015 4:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:

> Thanks,
>
> I've updated my code to use updateStateByKey but am still getting these
> errors when I resume from a checkpoint.
>
> One thought of mine was that I used sc.parallelize to generate the RDDs
> for the queue, but perhaps on resume, it doesn't recreate the context
> needed?
>
>
>
> --
>
> Shaanan Cohney
> PhD Student
> University of Pennsylvania
>
>
> shaan...@gmail.com
>
> On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet <
> benjamin.fra...@gmail.com> wrote:
>
>> I would suggest you have a look at the updateStateByKey transformation in
>> the Spark Streaming programming guide which should fit your needs better
>> than your update_state function.
>> On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>
>>> Counts is a list (counts = []) in the driver, used to collect the
>>> results.
>>> It seems like it's also not the best way to be doing things, but I'm new
>>> to spark and editing someone else's code so still learning.
>>> Thanks!
>>>
>>>
>>> def update_state(out_files, counts, curr_rdd):
>>>     try:
>>>         for c in curr_rdd.collect():
>>>             fnames, count = c
>>>             counts.append(count)
>>>             out_files |= fnames
>>>     except Py4JJavaError as e:
>>>         print("EXCEPTION: %s" % str(e))
>>>
>>> --
>>>
>>> Shaanan Cohney
>>> PhD Student
>>> University of Pennsylvania
>>>
>>>
>>> shaan...@gmail.com
>>>
>>> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <
>>> benjamin.fra...@gmail.com> wrote:
>>>
>>>> What does "counts" refer to?
>>>>
>>>> Could you also paste the code of your "update_state" function?
>>>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>>>
>>>>> I'm receiving the SPARK-5063 error (RDD transformations and actions
>>>>> can only be invoked by the driver, not inside of other transformations)
>>>>> whenever I try and restore from a checkpoint in spark streaming on my app.
>>>>>
>>>>> I'm using python3 and my RDDs are inside a queuestream DStream.
>>>>>
>>>>> This is the little chunk of code causing issues:
>>>>>
>>>>> -----
>>>>>
>>>>> p_batches = [sc.parallelize(batch) for batch in task_batches]
>>>>>
>>>>> sieving_tasks = ssc.queueStream(p_batches)
>>>>> sieving_tasks.checkpoint(20)
>>>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
>>>>> poly_path, fb_paths))
>>>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
>>>>> ).foreachRDD(lambda s: update_state(out_files, counts, s))
>>>>> ssc.checkpoint(s3n_path)
>>>>>
>>>>> -----
>>>>>
>>>>> Thanks again!
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Shaanan Cohney
>>>>> PhD Student
>>>>> University of Pennsylvania
>>>>>
>>>>>
>>>>> shaan...@gmail.com
>>>>>
>>>>
>>>
>

Reply via email to