protobuf data as input to spark streaming

2022-04-05 Thread Kiran Biswal
Hello Experts

Has anyone used protobuf (proto3) encoded data (from kafka) as input source
and been able to do spark structured streaming?

I would appreciate if you can share any sample code/example

Regards
Kiran

>


Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-05 Thread Gourav Sengupta
Hi,

can you please give details around:
spark version, what is the operation that you are running, why in loops,
and whether you are caching in any data or not, and whether you are
referencing the variables to create them like in the following expression
we are referencing x to create x, x = x + 1

Thanks and Regards,
Gourav Sengupta

On Mon, Apr 4, 2022 at 10:51 AM Joris Billen 
wrote:

> Clear-probably not a good idea.
>
> But a previous comment said “you are doing everything in the end in one
> go”.
> So this made me wonder: in case your only action is a write in the end
> after lots of complex transformations, then what is the alternative for
> writing in the end which means doing everything all at once in the end? My
> understanding is that if there is no need for an action earlier, you will
> do all at the end, which means there is a limitation to how many days you
> can process at once. And hence the solution is to loop over a couple days,
> and submit always the same spark job just for other input.
>
>
> Thanks!
>
> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>
> This feels like premature optimization, and not clear it's optimizing, but
> maybe.
> Caching things that are used once is worse than not caching. It looks like
> a straight-line through to the write, so I doubt caching helps anything
> here.
>
> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
> wrote:
>
>> Hi,
>> as said thanks for little discussion over mail.
>> I understand that the action is triggered in the end at the write and
>> then all of a sudden everything is executed at once. But I dont really need
>> to trigger an action before. I am caching somewherew a df that will be
>> reused several times (slightly updated pseudocode below).
>>
>> Question: is it then better practice to already trigger some actions on
>>  intermediate data frame (like df4 and df8), and cache them? So that these
>> actions will not be that expensive yet, and the actions to write at the end
>> will require less resources, which would allow to process more days in one
>> go? LIke what is added in red in improvement section in the pseudo code
>> below?
>>
>>
>>
>> *pseudocode:*
>>
>>
>> *loop over all days:*
>> *spark submit 1 day*
>>
>>
>>
>> with spark submit (overly simplified)=
>>
>>
>> *  df=spark.read(hfs://somepath)*
>> *  …*
>> *   ##IMPROVEMENT START*
>> *   df4=spark.sql(some stuff with df3)*
>> *   spark.sql(CACHE TABLE df4)*
>> *   …*
>> *   df8=spark.sql(some stuff with df7)*
>> *   spark.sql(CACHE TABLE df8)*
>> *  ##IMPROVEMENT END*
>> *   ...*
>> *   df12=df11.spark.sql(complex stufff)*
>> *  spark.sql(CACHE TABLE df10)*
>> *   ...*
>> *  df13=spark.sql( complex stuff with df12)*
>> *  df13.write *
>> *  df14=spark.sql( some other complex stuff with df12)*
>> *  df14.write *
>> *  df15=spark.sql( some completely other complex stuff with df12)*
>> *  df15.write *
>>
>>
>>
>>
>>
>>
>> THanks!
>>
>>
>>
>> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>>
>> If that is your loop unrolled, then you are not doing parts of work at a
>> time. That will execute all operations in one go when the write finally
>> happens. That's OK, but may be part of the problem. For example if you are
>> filtering for a subset, processing, and unioning, then that is just a
>> harder and slower way of applying the transformation to all data at once.
>>
>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Thanks for reply :-)
>>>
>>> I am using pyspark. Basicially my code (simplified is):
>>>
>>> df=spark.read.csv(hdfs://somehdfslocation)
>>> df1=spark.sql (complex statement using df)
>>> ...
>>> dfx=spark.sql(complex statement using df x-1)
>>> ...
>>> dfx15.write()
>>>
>>>
>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>> cached dataframes at the end and stopping the spark context explicitly:
>>> sc.stop()?
>>>
>>>
>>> FOr processing many years at once versus a chunk in a loop: I see that
>>> if I go up to certain number of days, one iteration will start to have
>>> tasks that fail. So I only take a limited number of days, and do this
>>> process several times. Isnt this normal as you are always somehow limited
>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>> in theory you could process any volume, in case you wait long enough? I
>>> guess spark can only break down the tasks up to a certain level (based on
>>> the datasets' and the intermediate results’ partitions) and at some moment
>>> you hit the limit where your resources are not sufficient anymore to
>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll
>>> hit a limit?
>>>
>>>
>>>
>>> Concretely  following topics would be interesting to find out more about
>>> (links):
>>> -where to see what you are still consuming after spark job ended if you
>>> didnt close resources
>>> -memory leaks for pyspark
>>> -good article about closing resources (you find tons of snippets on how
>>> to start