Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Danilo Sousa
Unsubscribe

> Em 9 de out. de 2023, à(s) 07:03, Mich Talebzadeh  
> escreveu:
> 
> Hi,
> 
> Please see my responses below:
> 
> 1) In Spark Structured Streaming does commit mean streaming data has been 
> delivered to the sink like Snowflake?
> 
> No. a commit does not refer to data being delivered to a sink like Snowflake 
> or bigQuery. The term commit refers to Spark Structured Streaming (SS) 
> internals. Specifically it means that a micro-batch of data has been 
> processed by SSS. In the checkpoint directory there is a subdirectory called 
> commits that marks the micro-batch process as completed.
> 
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
> timely manner, will there be an impact on spark streaming itself?
> 
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a 
> timely manner, the batches will start to back up in SSS. This can cause Spark 
> to run out of memory and the streaming job to fail. As I understand, Spark 
> will use a combination of memory and disk storage (checkpointing). This can 
> also happen if the network interface between Spark and the sink is disrupted. 
> On the other hand Spark may slow down, as it tries to process the backed-up 
> batches of data. You want to avoid these scenarios.
> 
> HTH
> 
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
> 
>view my Linkedin profile 
> 
> 
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
>  wrote:
>> Hello team
>> 
>> 1) In Spark Structured Streaming does commit mean streaming data has been 
>> delivered to the sink like Snowflake?
>> 
>> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
>> timely manner, will there be an impact on spark streaming itself?
>> 
>> Thanks
>> 
>> AK



unsubscribe

2023-09-19 Thread Danilo Sousa
unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Help With unstructured text file with spark scala

2022-02-25 Thread Danilo Sousa
Rafael Mendes,

Are you from ?

Thanks.
> On 21 Feb 2022, at 15:33, Danilo Sousa  wrote:
> 
> Yes, this a only single file.
> 
> Thanks Rafael Mendes.
> 
>> On 13 Feb 2022, at 07:13, Rafael Mendes > <mailto:rafaelpir...@gmail.com>> wrote:
>> 
>> Hi, Danilo.
>> Do you have a single large file, only?
>> If so, I guess you can use tools like sed/awk to split it into more files 
>> based on layout, so you can read these files into Spark.
>> 
>> 
>> Em qua, 9 de fev de 2022 09:30, Bitfox > <mailto:bit...@bitfox.top>> escreveu:
>> Hi
>> 
>> I am not sure about the total situation.
>> But if you want a scala integration I think it could use regex to match and 
>> capture the keywords.
>> Here I wrote one you can modify by your end.
>> 
>> import scala.io.Source
>> import scala.collection.mutable.ArrayBuffer
>> 
>> val list1 = ArrayBuffer[(String,String,String)]()
>> val list2 = ArrayBuffer[(String,String)]()
>> 
>> 
>> val patt1 = """^(.*)#(.*)#([^#]*)$""".r
>> val patt2 = """^(.*)#([^#]*)$""".r
>> 
>> val file = "1.txt"
>> val lines = Source.fromFile(file).getLines()
>> 
>> for ( x <- lines ) {
>>   x match {
>>     case patt1(k,v,z) => list1 += ((k,v,z))
>> case patt2(k,v) => list2 += ((k,v))
>> case _ => println("no match")
>>   }
>> }
>> 
>> 
>> Now the list1 and list2 have the elements you wanted, you can convert them 
>> to a dataframe easily.
>> 
>> Thanks.
>> 
>> On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa > <mailto:danilosousa...@gmail.com>> wrote:
>> Hello
>> 
>> 
>> Yes, for this block I can open as csv with # delimiter, but have the block 
>> that is no csv format. 
>> 
>> This is the likely key value. 
>> 
>> We have two different layouts in the same file. This is the “problem”.
>> 
>> Thanks for your time.
>> 
>> 
>> 
>>> Relação de Beneficiários Ativos e Excluídos
>>> Carteira em#27/12/2019##Todos os Beneficiários
>>> Operadora#AMIL
>>> Filial#SÃO PAULO#Unidade#Guarulhos
>>> 
>>> Contrato#123456 - Test
>>> Empresa#Test
>> 
>>> On 9 Feb 2022, at 00:58, Bitfox >> <mailto:bit...@bitfox.top>> wrote:
>>> 
>>> Hello
>>> 
>>> You can treat it as a csf file and load it from spark:
>>> 
>>> >>> df = spark.read.format("csv").option("inferSchema", 
>>> >>> "true").option("header", "true").option("sep","#").load(csv_file)
>>> >>> df.show()
>>> ++---+-+
>>> |   Plano|Código Beneficiário|Nome Beneficiário|
>>> ++---+-+
>>> |58693 - NACIONAL ...|   65751353|   Jose Silva|
>>> |58693 - NACIONAL ...|   65751388|  Joana Silva|
>>> |58693 - NACIONAL ...|   65751353| Felipe Silva|
>>> |58693 - NACIONAL ...|   65751388|  Julia Silva|
>>> ++---+-+
>>> 
>>> 
>>> cat csv_file:
>>> 
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>>> 
>>> 
>>> Regards
>>> 
>>> 
>>> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa >> <mailto:danilosousa...@gmail.com>> wrote:
>>> Hi
>>> I have to transform unstructured text to dataframe.
>>> Could anyone please help with Scala code ?
>>> 
>>> Dataframe need as:
>>> 
>>> operadora filial unidade contrato empresa plano codigo_beneficiario 
>>> nome_beneficiario
>>> 
>>> Relação de Beneficiários Ativos e Excluídos
>>> Carteira em#27/12/2019##Todos os Beneficiários
>>> Operadora#AMIL
>>> Filial#SÃO PAULO#Unidade#Guarulhos
>>> 
>>> Contrato#123456 - Test
>>> Empresa#Test
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>>> 
>>> Contrato#898011000 - FUNDACAO GERDAU
>>> Empresa#FUNDACAO GERDAU
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
> 



Re: StructuredStreaming - foreach/foreachBatch

2022-02-21 Thread Danilo Sousa
Hello Gourav,

I`’ll read this Document.


Thanks.

> On 17 Feb 2022, at 14:05, Gourav Sengupta  wrote:
> 
> Hi,
> 
> The following excellent documentation may help as well: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>  
> 
>  
> 
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the 
> fundamental thinking behind these concepts.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:51 PM karan alang  > wrote:
> Thanks, Mich .. will check it out
> 
> regds,
> Karan Alang
> 
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh  > wrote:
> BTW you can check this Linkedin article of mine on Processing Change Data 
> Capture with Spark Structured Streaming 
> 
> 
> It covers the concept of triggers including trigger(once = True) or one-time 
> batch in Spark Structured Streaming
> 
> HTH
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 23:06, karan alang  > wrote:
> Thanks, Mich .. that worked fine!
> 
> 
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh  > wrote:
> read below
> 
> """
>"foreach" performs custom write logic on each row and 
> "foreachBatch" performs custom write logic on each micro-batch through 
> SendToBigQuery function
> foreachBatch(SendToBigQuery) expects 2 parameters, first: 
> micro-batch as DataFrame or Dataset and second: unique id for each batch --> 
> batchId
>Using foreachBatch, we write each micro batch to storage 
> defined in our custom logic. In this case, we store the output of our 
> streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is defined 
> as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(SendToBigQuery). \
>  trigger(processingTime='2 seconds'). \
>  start()
> 
> now you define your function SendToBigQuery() 
> 
> def SendToBigQuery(df, batchId):
> if(len(df.take(1))) > 0:
> df.printSchema()
> print(f"""batchId is {batchId}""")
> rows = df.count()
> print(f""" Total records processed in this run = {rows}""")
> ..
> else:
> print("DataFrame is empty")
> 
> HTH
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 21:06, karan alang  > wrote:
> Hello All,
> 
> I'm using StructuredStreaming to read data from Kafka, and need to do 
> transformation on each individual row.
> 
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is used ?
> 
> Also, when I use foreachBatch, seems the BatchId is available in the function 
> called ? How do I access individual rows ?
> 
> Details are in stackoverflow :   
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>  
> 
> 
> What is the best approach for this use-case ?
> 
> tia!



Re: Help With unstructured text file with spark scala

2022-02-21 Thread Danilo Sousa
Yes, this a only single file.

Thanks Rafael Mendes.

> On 13 Feb 2022, at 07:13, Rafael Mendes  wrote:
> 
> Hi, Danilo.
> Do you have a single large file, only?
> If so, I guess you can use tools like sed/awk to split it into more files 
> based on layout, so you can read these files into Spark.
> 
> 
> Em qua, 9 de fev de 2022 09:30, Bitfox  escreveu:
> Hi
> 
> I am not sure about the total situation.
> But if you want a scala integration I think it could use regex to match and 
> capture the keywords.
> Here I wrote one you can modify by your end.
> 
> import scala.io.Source
> import scala.collection.mutable.ArrayBuffer
> 
> val list1 = ArrayBuffer[(String,String,String)]()
> val list2 = ArrayBuffer[(String,String)]()
> 
> 
> val patt1 = """^(.*)#(.*)#([^#]*)$""".r
> val patt2 = """^(.*)#([^#]*)$""".r
> 
> val file = "1.txt"
> val lines = Source.fromFile(file).getLines()
> 
> for ( x <- lines ) {
>   x match {
> case patt1(k,v,z) => list1 += ((k,v,z))
> case patt2(k,v) => list2 += ((k,v))
> case _ => println("no match")
>   }
> }
> 
> 
> Now the list1 and list2 have the elements you wanted, you can convert them to 
> a dataframe easily.
> 
> Thanks.
> 
> On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa  <mailto:danilosousa...@gmail.com>> wrote:
> Hello
> 
> 
> Yes, for this block I can open as csv with # delimiter, but have the block 
> that is no csv format. 
> 
> This is the likely key value. 
> 
> We have two different layouts in the same file. This is the “problem”.
> 
> Thanks for your time.
> 
> 
> 
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>> 
>> Contrato#123456 - Test
>> Empresa#Test
> 
>> On 9 Feb 2022, at 00:58, Bitfox > <mailto:bit...@bitfox.top>> wrote:
>> 
>> Hello
>> 
>> You can treat it as a csf file and load it from spark:
>> 
>> >>> df = spark.read.format("csv").option("inferSchema", 
>> >>> "true").option("header", "true").option("sep","#").load(csv_file)
>> >>> df.show()
>> ++---+-+
>> |   Plano|Código Beneficiário|Nome Beneficiário|
>> ++---+-+
>> |58693 - NACIONAL ...|   65751353|   Jose Silva|
>> |58693 - NACIONAL ...|   65751388|  Joana Silva|
>> |58693 - NACIONAL ...|   65751353|     Felipe Silva|
>> |58693 - NACIONAL ...|   65751388|  Julia Silva|
>> ++---+-+
>> 
>> 
>> cat csv_file:
>> 
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> 
>> 
>> Regards
>> 
>> 
>> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa > <mailto:danilosousa...@gmail.com>> wrote:
>> Hi
>> I have to transform unstructured text to dataframe.
>> Could anyone please help with Scala code ?
>> 
>> Dataframe need as:
>> 
>> operadora filial unidade contrato empresa plano codigo_beneficiario 
>> nome_beneficiario
>> 
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>> 
>> Contrato#123456 - Test
>> Empresa#Test
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>> 
>> Contrato#898011000 - FUNDACAO GERDAU
>> Empresa#FUNDACAO GERDAU
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
> 



Re: Help With unstructured text file with spark scala

2022-02-09 Thread Danilo Sousa
Hello, how are you?

Thanks for your time

> Does the data contain records? 
Yes
> Are the records "homogenous" ; ie; do they have the same fields?
Yes the data is homogenous but have “two layouts” in the same file.
> What is the format of the data?
All data is string file .txt
> Are records separated by lines/seperators?
Yes, the delimiter is “#” but as said, we have two layouts in the same file
This likely key value
>Carteira em#27/12/2019##Todos os Beneficiários
>Operadora#AMIL
>Filial#SÃO PAULO#Unidade#Guarulhos
> 
>Contrato#123456 - Test
>Empresa#Test

And this like csv format

>Plano#Código Beneficiário#Nome Beneficiário
>58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>58693 - NACIONAL R COPART PJCE#073930313#Maria Silva

> Is the data sharded across multiple files?
No
> How big is each shard?
Approximately 20gb

> On 8 Feb 2022, at 16:56, Lalwani, Jayesh  wrote:
> 
> You will need to provide more info.
> 
> Does the data contain records? 
> Are the records "homogenous" ; ie; do they have the same fields?
> What is the format of the data?
> Are records separated by lines/seperators?
> Is the data sharded across multiple files?
> How big is each shard?
> 
> 
> 
> On 2/8/22, 11:50 AM, "Danilo Sousa"  wrote:
> 
>CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and know 
> the content is safe.
> 
> 
> 
>Hi
>I have to transform unstructured text to dataframe.
>Could anyone please help with Scala code ?
> 
>Dataframe need as:
> 
>operadora filial unidade contrato empresa plano codigo_beneficiario 
> nome_beneficiario
> 
>Relação de Beneficiários Ativos e Excluídos
>Carteira em#27/12/2019##Todos os Beneficiários
>Operadora#AMIL
>Filial#SÃO PAULO#Unidade#Guarulhos
> 
>Contrato#123456 - Test
>Empresa#Test
>Plano#Código Beneficiário#Nome Beneficiário
>58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
> 
>Contrato#898011000 - FUNDACAO GERDAU
>Empresa#FUNDACAO GERDAU
>Plano#Código Beneficiário#Nome Beneficiário
>58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Help With unstructured text file with spark scala

2022-02-09 Thread Danilo Sousa
Hello


Yes, for this block I can open as csv with # delimiter, but have the block that 
is no csv format. 

This is the likely key value. 

We have two different layouts in the same file. This is the “problem”.

Thanks for your time.



> Relação de Beneficiários Ativos e Excluídos
> Carteira em#27/12/2019##Todos os Beneficiários
> Operadora#AMIL
> Filial#SÃO PAULO#Unidade#Guarulhos
> 
> Contrato#123456 - Test
> Empresa#Test

> On 9 Feb 2022, at 00:58, Bitfox  wrote:
> 
> Hello
> 
> You can treat it as a csf file and load it from spark:
> 
> >>> df = spark.read.format("csv").option("inferSchema", 
> >>> "true").option("header", "true").option("sep","#").load(csv_file)
> >>> df.show()
> ++---+-+
> |   Plano|Código Beneficiário|Nome Beneficiário|
> ++---+-+
> |58693 - NACIONAL ...|   65751353|   Jose Silva|
> |58693 - NACIONAL ...|   65751388|  Joana Silva|
> |58693 - NACIONAL ...|   65751353| Felipe Silva|
> |58693 - NACIONAL ...|   65751388|  Julia Silva|
> ++---+-+
> 
> 
> cat csv_file:
> 
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
> 
> 
> Regards
> 
> 
> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa  <mailto:danilosousa...@gmail.com>> wrote:
> Hi
> I have to transform unstructured text to dataframe.
> Could anyone please help with Scala code ?
> 
> Dataframe need as:
> 
> operadora filial unidade contrato empresa plano codigo_beneficiario 
> nome_beneficiario
> 
> Relação de Beneficiários Ativos e Excluídos
> Carteira em#27/12/2019##Todos os Beneficiários
> Operadora#AMIL
> Filial#SÃO PAULO#Unidade#Guarulhos
> 
> Contrato#123456 - Test
> Empresa#Test
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
> 
> Contrato#898011000 - FUNDACAO GERDAU
> Empresa#FUNDACAO GERDAU
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 



Help With unstructured text file with spark scala

2022-02-08 Thread Danilo Sousa
Hi
I have to transform unstructured text to dataframe.
Could anyone please help with Scala code ?

Dataframe need as:

operadora filial unidade contrato empresa plano codigo_beneficiario 
nome_beneficiario

Relação de Beneficiários Ativos e Excluídos
Carteira em#27/12/2019##Todos os Beneficiários
Operadora#AMIL
Filial#SÃO PAULO#Unidade#Guarulhos

Contrato#123456 - Test
Empresa#Test
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
58693 - NACIONAL R COPART PJCE#073930313#Maria Silva

Contrato#898011000 - FUNDACAO GERDAU
Empresa#FUNDACAO GERDAU
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org