Re: Appending a static dataframe to a stream create Parquet file fails
I'd recommend getting in touch with Delta Lake community (Google Groups) https://groups.google.com/forum/#!forum/delta-users to get more feedback from experts about Delta Lake specific issues. On Mon, Sep 6, 2021 at 1:56 AM wrote: > Hi Jungtaek, > thanks for your reply. I was afraid that the problem is not only on my > side but rather of conceptual nature. I guess I have to rethink my > approach. However, because you mentioned DeltaLake. I have the same > problem, but the other way around, with DeltaLake. I cannot write with a > stream to a DeltaLake created from a static dataframe. > > Anyhow, best regards > Eugen > > On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote: > > Hi, > > The file stream sink maintains the metadata in the output directory. The > metadata retains the list of files written by the streaming query, and > Spark reads the metadata on listing the files to read. > > This is to guarantee end-to-end exactly once on writing files in the > streaming query. There could be failure on the streaming query and some > files may be partially written. Metadata will help to skip reading these > files and only read files which are correctly written. > > This leads to a major restriction, you can't write the output directory > from multiple queries. For your case, Spark will only read the files which > are written from the streaming query. > > There are 3rd party projects dealing with transactional write from > multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so on. > You may want to check them out. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Sep 2, 2021 at 10:04 PM wrote: > > Hi all, > I recently stumbled about a rather strange problem with streaming > sources in one of my tests. I am writing a Parquet file from a streaming > source and subsequently try to append the same data but this time from a > static dataframe. Surprisingly, the number of rows in the Parquet file > remains the same after the append operation. > Here is the relevant code > > "Appending data from static dataframe" must "produce twice as much data" in > { > > logLinesStream.writeStream > > .format("parquet") > > .option("path", path.toString) > > .outputMode("append") > > .start() > > .processAllAvailable() > > spark.read.format("parquet").load(path.toString).count mustBe 1159 > > > logLinesDF.write.format("parquet").mode("append").save(path.toString) > > spark.read.format("parquet").load(path.toString).count mustBe 2*1159 > > } > > > Does anyone have an idea what I am doing wrong here? > > thanks in advance > Eugen Wintersberger > > >
Re: Appending a static dataframe to a stream create Parquet file fails
Hi Jungtaek, thanks for your reply. I was afraid that the problem is not only on my side but rather of conceptual nature. I guess I have to rethink my approach. However, because you mentioned DeltaLake. I have the same problem, but the other way around, with DeltaLake. I cannot write with a stream to a DeltaLake created from a static dataframe. Anyhow, best regards Eugen On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote: > Hi, > > The file stream sink maintains the metadata in the output directory. > The metadata retains the list of files written by the streaming > query, and Spark reads the metadata on listing the files to read. > > This is to guarantee end-to-end exactly once on writing files in the > streaming query. There could be failure on the streaming query and > some files may be partially written. Metadata will help to skip > reading these files and only read files which are correctly written. > > This leads to a major restriction, you can't write the output > directory from multiple queries. For your case, Spark will only read > the files which are written from the streaming query. > > There are 3rd party projects dealing with transactional write from > multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so > on. You may want to check them out. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Sep 2, 2021 at 10:04 PM > wrote: > > Hi all, > > I recently stumbled about a rather strange problem with > > streaming sources in one of my tests. I am writing a Parquet file > > from a streaming source and subsequently try to append the same > > data but this time from a static dataframe. Surprisingly, the > > number of rows in the Parquet file remains the same after the > > append operation. > > Here is the relevant code > > > > "Appending data from static dataframe" must "produce twice as much > > data" in { > > logLinesStream.writeStream > > .format("parquet") > > .option("path", path.toString) > > .outputMode("append") > > .start() > > .processAllAvailable() > > spark.read.format("parquet").load(path.toString).count mustBe 1159 > > > > logLinesDF.write.format("parquet").mode("append").save(path.toStrin > > g) > > spark.read.format("parquet").load(path.toString).count mustBe > > 2*1159 > > } > > > > Does anyone have an idea what I am doing wrong here? > > > > thanks in advance > > Eugen Wintersberger
Re: Appending a static dataframe to a stream create Parquet file fails
Hi, The file stream sink maintains the metadata in the output directory. The metadata retains the list of files written by the streaming query, and Spark reads the metadata on listing the files to read. This is to guarantee end-to-end exactly once on writing files in the streaming query. There could be failure on the streaming query and some files may be partially written. Metadata will help to skip reading these files and only read files which are correctly written. This leads to a major restriction, you can't write the output directory from multiple queries. For your case, Spark will only read the files which are written from the streaming query. There are 3rd party projects dealing with transactional write from multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so on. You may want to check them out. Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Sep 2, 2021 at 10:04 PM wrote: > Hi all, > I recently stumbled about a rather strange problem with streaming > sources in one of my tests. I am writing a Parquet file from a streaming > source and subsequently try to append the same data but this time from a > static dataframe. Surprisingly, the number of rows in the Parquet file > remains the same after the append operation. > Here is the relevant code > > "Appending data from static dataframe" must "produce twice as much data" in > { > > logLinesStream.writeStream > > .format("parquet") > > .option("path", path.toString) > > .outputMode("append") > > .start() > > .processAllAvailable() > > spark.read.format("parquet").load(path.toString).count mustBe 1159 > > > logLinesDF.write.format("parquet").mode("append").save(path.toString) > > spark.read.format("parquet").load(path.toString).count mustBe 2*1159 > > } > > > Does anyone have an idea what I am doing wrong here? > > thanks in advance > Eugen Wintersberger >
Appending a static dataframe to a stream create Parquet file fails
Hi all, I recently stumbled about a rather strange problem with streaming sources in one of my tests. I am writing a Parquet file from a streaming source and subsequently try to append the same data but this time from a static dataframe. Surprisingly, the number of rows in the Parquet file remains the same after the append operation. Here is the relevant code "Appending data from static dataframe" must "produce twice as much data" in { logLinesStream.writeStream .format("parquet") .option("path", path.toString) .outputMode("append") .start() .processAllAvailable() spark.read.format("parquet").load(path.toString).count mustBe 1159 logLinesDF.write.format("parquet").mode("append").save(path.toString) spark.read.format("parquet").load(path.toString).count mustBe 2*1159 } Does anyone have an idea what I am doing wrong here? thanks in advance Eugen Wintersberger