Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-06 Thread Jungtaek Lim
I'd recommend getting in touch with Delta Lake community (Google Groups)
https://groups.google.com/forum/#!forum/delta-users to get more feedback
from experts about Delta Lake specific issues.



On Mon, Sep 6, 2021 at 1:56 AM  wrote:

> Hi Jungtaek,
>  thanks for your reply. I was afraid that the problem is not only on my
> side but rather of conceptual nature. I guess I have to rethink my
> approach. However, because you mentioned DeltaLake. I have the same
> problem, but the other way around, with DeltaLake. I cannot write with a
> stream to a DeltaLake created from a static dataframe.
>
> Anyhow, best regards
>   Eugen
>
> On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote:
>
> Hi,
>
> The file stream sink maintains the metadata in the output directory. The
> metadata retains the list of files written by the streaming query, and
> Spark reads the metadata on listing the files to read.
>
> This is to guarantee end-to-end exactly once on writing files in the
> streaming query. There could be failure on the streaming query and some
> files may be partially written. Metadata will help to skip reading these
> files and only read files which are correctly written.
>
> This leads to a major restriction, you can't write the output directory
> from multiple queries. For your case, Spark will only read the files which
> are written from the streaming query.
>
> There are 3rd party projects dealing with transactional write from
> multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so on.
> You may want to check them out.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Sep 2, 2021 at 10:04 PM  wrote:
>
> Hi all,
>   I recently stumbled about a rather strange  problem with streaming
> sources in one of my tests. I am writing a Parquet file from a streaming
> source and subsequently try to append the same data but this time from a
> static dataframe. Surprisingly, the number of rows in the Parquet file
> remains the same after the append operation.
> Here is the relevant code
>
>   "Appending data from static dataframe" must "produce twice as much data" in 
> {
>
> logLinesStream.writeStream
>
>   .format("parquet")
>
>   .option("path", path.toString)
>
>   .outputMode("append")
>
>   .start()
>
>   .processAllAvailable()
>
> spark.read.format("parquet").load(path.toString).count mustBe 1159
>
>
> logLinesDF.write.format("parquet").mode("append").save(path.toString)
>
> spark.read.format("parquet").load(path.toString).count mustBe 2*1159
>
>   }
>
>
> Does anyone have an idea what I am doing wrong here?
>
> thanks in advance
>  Eugen Wintersberger
>
>
>


Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-05 Thread eugen . wintersberger
Hi Jungtaek, 
 thanks for your reply. I was afraid that the problem is not only on my
side but rather of conceptual nature. I guess I have to rethink my
approach. However, because you mentioned DeltaLake. I have the same
problem, but the other way around, with DeltaLake. I cannot write with
a stream to a DeltaLake created from a static dataframe.

Anyhow, best regards
  Eugen

On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote:
> Hi,
> 
> The file stream sink maintains the metadata in the output directory.
> The metadata retains the list of files written by the streaming
> query, and Spark reads the metadata on listing the files to read.
> 
> This is to guarantee end-to-end exactly once on writing files in the
> streaming query. There could be failure on the streaming query and
> some files may be partially written. Metadata will help to skip
> reading these files and only read files which are correctly written.
> 
> This leads to a major restriction, you can't write the output
> directory from multiple queries. For your case, Spark will only read
> the files which are written from the streaming query.
> 
> There are 3rd party projects dealing with transactional write from
> multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so
> on. You may want to check them out.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> On Thu, Sep 2, 2021 at 10:04 PM 
> wrote:
> > Hi all,
> >   I recently stumbled about a rather strange  problem with
> > streaming sources in one of my tests. I am writing a Parquet file
> > from a streaming source and subsequently try to append the same
> > data but this time from a static dataframe. Surprisingly, the
> > number of rows in the Parquet file remains the same after the
> > append operation. 
> > Here is the relevant code
> > 
> > "Appending data from static dataframe" must "produce twice as much
> > data" in {
> > logLinesStream.writeStream
> > .format("parquet")
> > .option("path", path.toString)
> > .outputMode("append")
> > .start()
> > .processAllAvailable()
> > spark.read.format("parquet").load(path.toString).count mustBe 1159
> > 
> > logLinesDF.write.format("parquet").mode("append").save(path.toStrin
> > g)
> > spark.read.format("parquet").load(path.toString).count mustBe
> > 2*1159
> > }
> > 
> > Does anyone have an idea what I am doing wrong here?
> > 
> > thanks in advance
> >  Eugen Wintersberger



Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-02 Thread Jungtaek Lim
Hi,

The file stream sink maintains the metadata in the output directory. The
metadata retains the list of files written by the streaming query, and
Spark reads the metadata on listing the files to read.

This is to guarantee end-to-end exactly once on writing files in the
streaming query. There could be failure on the streaming query and some
files may be partially written. Metadata will help to skip reading these
files and only read files which are correctly written.

This leads to a major restriction, you can't write the output directory
from multiple queries. For your case, Spark will only read the files which
are written from the streaming query.

There are 3rd party projects dealing with transactional write from multiple
writes, (alphabetically) Apache Iceberg, Delta Lake, and so on. You may
want to check them out.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Sep 2, 2021 at 10:04 PM  wrote:

> Hi all,
>   I recently stumbled about a rather strange  problem with streaming
> sources in one of my tests. I am writing a Parquet file from a streaming
> source and subsequently try to append the same data but this time from a
> static dataframe. Surprisingly, the number of rows in the Parquet file
> remains the same after the append operation.
> Here is the relevant code
>
>   "Appending data from static dataframe" must "produce twice as much data" in 
> {
>
> logLinesStream.writeStream
>
>   .format("parquet")
>
>   .option("path", path.toString)
>
>   .outputMode("append")
>
>   .start()
>
>   .processAllAvailable()
>
> spark.read.format("parquet").load(path.toString).count mustBe 1159
>
>
> logLinesDF.write.format("parquet").mode("append").save(path.toString)
>
> spark.read.format("parquet").load(path.toString).count mustBe 2*1159
>
>   }
>
>
> Does anyone have an idea what I am doing wrong here?
>
> thanks in advance
>  Eugen Wintersberger
>


Appending a static dataframe to a stream create Parquet file fails

2021-09-02 Thread eugen . wintersberger
Hi all,
  I recently stumbled about a rather strange  problem with streaming
sources in one of my tests. I am writing a Parquet file from a
streaming source and subsequently try to append the same data but this
time from a static dataframe. Surprisingly, the number of rows in the
Parquet file remains the same after the append operation. 
Here is the relevant code

  "Appending data from static dataframe" must "produce twice as much data" in {
logLinesStream.writeStream
  .format("parquet")
  .option("path", path.toString)
  .outputMode("append")
  .start()
  .processAllAvailable()
spark.read.format("parquet").load(path.toString).count mustBe 1159

logLinesDF.write.format("parquet").mode("append").save(path.toString)
spark.read.format("parquet").load(path.toString).count mustBe 2*1159
  }

Does anyone have an idea what I am doing wrong here?

thanks in advance
 Eugen Wintersberger