Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-05 Thread eugen . wintersberger
Hi Jungtaek, 
 thanks for your reply. I was afraid that the problem is not only on my
side but rather of conceptual nature. I guess I have to rethink my
approach. However, because you mentioned DeltaLake. I have the same
problem, but the other way around, with DeltaLake. I cannot write with
a stream to a DeltaLake created from a static dataframe.

Anyhow, best regards
  Eugen

On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote:
> Hi,
> 
> The file stream sink maintains the metadata in the output directory.
> The metadata retains the list of files written by the streaming
> query, and Spark reads the metadata on listing the files to read.
> 
> This is to guarantee end-to-end exactly once on writing files in the
> streaming query. There could be failure on the streaming query and
> some files may be partially written. Metadata will help to skip
> reading these files and only read files which are correctly written.
> 
> This leads to a major restriction, you can't write the output
> directory from multiple queries. For your case, Spark will only read
> the files which are written from the streaming query.
> 
> There are 3rd party projects dealing with transactional write from
> multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so
> on. You may want to check them out.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> On Thu, Sep 2, 2021 at 10:04 PM 
> wrote:
> > Hi all,
> >   I recently stumbled about a rather strange  problem with
> > streaming sources in one of my tests. I am writing a Parquet file
> > from a streaming source and subsequently try to append the same
> > data but this time from a static dataframe. Surprisingly, the
> > number of rows in the Parquet file remains the same after the
> > append operation. 
> > Here is the relevant code
> > 
> > "Appending data from static dataframe" must "produce twice as much
> > data" in {
> > logLinesStream.writeStream
> > .format("parquet")
> > .option("path", path.toString)
> > .outputMode("append")
> > .start()
> > .processAllAvailable()
> > spark.read.format("parquet").load(path.toString).count mustBe 1159
> > 
> > logLinesDF.write.format("parquet").mode("append").save(path.toStrin
> > g)
> > spark.read.format("parquet").load(path.toString).count mustBe
> > 2*1159
> > }
> > 
> > Does anyone have an idea what I am doing wrong here?
> > 
> > thanks in advance
> >  Eugen Wintersberger



Appending a static dataframe to a stream create Parquet file fails

2021-09-02 Thread eugen . wintersberger
Hi all,
  I recently stumbled about a rather strange  problem with streaming
sources in one of my tests. I am writing a Parquet file from a
streaming source and subsequently try to append the same data but this
time from a static dataframe. Surprisingly, the number of rows in the
Parquet file remains the same after the append operation. 
Here is the relevant code

  "Appending data from static dataframe" must "produce twice as much data" in {
logLinesStream.writeStream
  .format("parquet")
  .option("path", path.toString)
  .outputMode("append")
  .start()
  .processAllAvailable()
spark.read.format("parquet").load(path.toString).count mustBe 1159

logLinesDF.write.format("parquet").mode("append").save(path.toString)
spark.read.format("parquet").load(path.toString).count mustBe 2*1159
  }

Does anyone have an idea what I am doing wrong here?

thanks in advance
 Eugen Wintersberger


Re: Append to an existing Delta Lake using structured streaming

2021-07-21 Thread eugen . wintersberger
I will try to provide a stripped down example of what I am doing

The initial delta lake is built from a dataframe like this from within
a notebook
val hourly_new = events.select(window('timestamp, "1 hour"), 'serial,
'type)
.select($"window.start".as("start"), 'serialno, 'eventType)
.withWatermark("start", "70 minutes")
.groupBy("start", "serial", "type")
.count()
.withColumn("year", year(col("start")))
.withColumn("month", month(col("start")))

// COMMAND --

hourly_new.write.format("delta").partitionBy("year","month").save("/pat

h/to/delta-lake")

Once this data has been written to disk I would like to append to it
using a streaming task 

val trigger =  Trigger.ProcessingTime(1)
df.writeStream
  .format(format)
  .option("path", "/path/to/delta-lake")
  .option("checkpointLocation", "/path/to/checkpoint")
  .outputMode(OutputMode.Append())
  .option("mergeSchema","true")
  .trigger(trigger)
  .partitionBy("year", "month")
  .start()

When I start the streaming job, the delta lake referenced never gets
any updates. The number of rows remain as they were after the notebook
code above.
Interestingly, when I create a new file with the streaming code above
and then restart the task the delta lake is happly updated every hour.
What I do not understand is what I do wrong in the notebook code so
that the streaming task is unable to append the data to the delta lake.

Thanks in advance and best regards
   Eugen

On Wed, 2021-07-21 at 19:51 +, Denny Lee wrote:
> Including the Delta Lake Users and Developers DL to help out.
> 
> Saying this, could you clarify how data is not being added?  By any
> chance do you have any code samples to recreate this?  
> 
> 
> Sent via Superhuman
> 
> 
> On Wed, Jul 21, 2021 at 2:49 AM, 
> wrote:
> > Hi all,
> >   I stumbled upon an interessting problem. I have an existing
> > Deltalake with data recovered from a backup and would like to
> > append to this Deltalake using Spark structured streaming. This
> > does not work. Although the streaming job is running no data is
> > appended.
> > If I created the original file with structured streaming than
> > appending to this file with a streaming job (at least with the same
> > job) works flawlessly.  Did I missunderstand something here?
> > 
> > best regards
> >    Eugen Wintersberger
> 



Append to an existing Delta Lake using structured streaming

2021-07-21 Thread eugen . wintersberger
Hi all,
  I stumbled upon an interessting problem. I have an existing Deltalake
with data recovered from a backup and would like to append to this
Deltalake using Spark structured streaming. This does not work.
Although the streaming job is running no data is appended.
If I created the original file with structured streaming than appending
to this file with a streaming job (at least with the same job) works
flawlessly.  Did I missunderstand something here?

best regards
   Eugen Wintersberger


Kafka structured straming - how to read headers

2020-12-03 Thread eugen . wintersberger
Hi folks,
  I am trying to read the message headers from a Kafka structured
stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")..load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")

})

My question is: what would be HeaderT?

Thanks in advance

 Eugen


Fitting only the intercept for LinearRegression

2020-03-21 Thread eugen . wintersberger
Hi,
  I was wondering if it would be possible to fit only the intercept on
a LinearRegression instance by providing a known coefficient?

Here is some background information: we have a problem where linear
regression is well suited as a predictor. However, the model requires
continuous adoption. During an initial training, the coefficient and
the intercept of the linear model are determined from a given set of
training data. Later, this model requires adoption during which the
intercept has to be adopted to a new set of training data (the
coefficient, in other words the slope, remains the same as obtained
from the initial model). 

I had a look on the Java API for LinearRegression and could not find a
way how to only fit the intercept and set initial parameters for a fit.
Am I missing something? Is there a way how to to do this with the
LinearRegression class in Sparks' ML package or do I have to use a
different approach?

Thanks in advance.

regards
 Eugen 



signature.asc
Description: This is a digitally signed message part