Re: Append to an existing Delta Lake using structured streaming

2021-07-21 Thread eugen . wintersberger
I will try to provide a stripped down example of what I am doing

The initial delta lake is built from a dataframe like this from within
a notebook
val hourly_new = events.select(window('timestamp, "1 hour"), 'serial,
'type)
.select($"window.start".as("start"), 'serialno, 'eventType)
.withWatermark("start", "70 minutes")
.groupBy("start", "serial", "type")
.count()
.withColumn("year", year(col("start")))
.withColumn("month", month(col("start")))

// COMMAND --

hourly_new.write.format("delta").partitionBy("year","month").save("/pat

h/to/delta-lake")

Once this data has been written to disk I would like to append to it
using a streaming task 

val trigger =  Trigger.ProcessingTime(1)
df.writeStream
  .format(format)
  .option("path", "/path/to/delta-lake")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode(OutputMode.Append())
  .option("mergeSchema","true")
  .trigger(trigger)
  .partitionBy("year", "month")
  .start()

When I start the streaming job, the delta lake referenced never gets
any updates. The number of rows remain as they were after the notebook
code above.
Interestingly, when I create a new file with the streaming code above
and then restart the task the delta lake is happly updated every hour.
What I do not understand is what I do wrong in the notebook code so
that the streaming task is unable to append the data to the delta lake.

Thanks in advance and best regards
   Eugen

On Wed, 2021-07-21 at 19:51 +, Denny Lee wrote:
> Including the Delta Lake Users and Developers DL to help out.
> 
> Saying this, could you clarify how data is not being added?  By any
> chance do you have any code samples to recreate this?  
> 
> 
> Sent via Superhuman
> 
> 
> On Wed, Jul 21, 2021 at 2:49 AM, 
> wrote:
> > Hi all,
> >   I stumbled upon an interessting problem. I have an existing
> > Deltalake with data recovered from a backup and would like to
> > append to this Deltalake using Spark structured streaming. This
> > does not work. Although the streaming job is running no data is
> > appended.
> > If I created the original file with structured streaming than
> > appending to this file with a streaming job (at least with the same
> > job) works flawlessly.  Did I missunderstand something here?
> > 
> > best regards
> >    Eugen Wintersberger
> 



Re: Append to an existing Delta Lake using structured streaming

2021-07-21 Thread Denny Lee
Including the Delta Lake Users and Developers DL to help out.

Saying this, could you clarify how data is not being added?  By any chance
do you have any code samples to recreate this?

Sent via Superhuman 


On Wed, Jul 21, 2021 at 2:49 AM,  wrote:

> Hi all,
>   I stumbled upon an interessting problem. I have an existing Deltalake
> with data recovered from a backup and would like to append to this
> Deltalake using Spark structured streaming. This does not work. Although
> the streaming job is running no data is appended.
> If I created the original file with structured streaming than appending to
> this file with a streaming job (at least with the same job) works
> flawlessly.  Did I missunderstand something here?
>
> best regards
>Eugen Wintersberger
>


Append to an existing Delta Lake using structured streaming

2021-07-21 Thread eugen . wintersberger
Hi all,
  I stumbled upon an interessting problem. I have an existing Deltalake
with data recovered from a backup and would like to append to this
Deltalake using Spark structured streaming. This does not work.
Although the streaming job is running no data is appended.
If I created the original file with structured streaming than appending
to this file with a streaming job (at least with the same job) works
flawlessly.  Did I missunderstand something here?

best regards
   Eugen Wintersberger