Re: Appending a static dataframe to a stream create Parquet file fails
Hi Jungtaek, thanks for your reply. I was afraid that the problem is not only on my side but rather of conceptual nature. I guess I have to rethink my approach. However, because you mentioned DeltaLake. I have the same problem, but the other way around, with DeltaLake. I cannot write with a stream to a DeltaLake created from a static dataframe. Anyhow, best regards Eugen On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote: > Hi, > > The file stream sink maintains the metadata in the output directory. > The metadata retains the list of files written by the streaming > query, and Spark reads the metadata on listing the files to read. > > This is to guarantee end-to-end exactly once on writing files in the > streaming query. There could be failure on the streaming query and > some files may be partially written. Metadata will help to skip > reading these files and only read files which are correctly written. > > This leads to a major restriction, you can't write the output > directory from multiple queries. For your case, Spark will only read > the files which are written from the streaming query. > > There are 3rd party projects dealing with transactional write from > multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so > on. You may want to check them out. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Sep 2, 2021 at 10:04 PM > wrote: > > Hi all, > > I recently stumbled about a rather strange problem with > > streaming sources in one of my tests. I am writing a Parquet file > > from a streaming source and subsequently try to append the same > > data but this time from a static dataframe. Surprisingly, the > > number of rows in the Parquet file remains the same after the > > append operation. > > Here is the relevant code > > > > "Appending data from static dataframe" must "produce twice as much > > data" in { > > logLinesStream.writeStream > > .format("parquet") > > .option("path", path.toString) > > .outputMode("append") > > .start() > > .processAllAvailable() > > spark.read.format("parquet").load(path.toString).count mustBe 1159 > > > > logLinesDF.write.format("parquet").mode("append").save(path.toStrin > > g) > > spark.read.format("parquet").load(path.toString).count mustBe > > 2*1159 > > } > > > > Does anyone have an idea what I am doing wrong here? > > > > thanks in advance > > Eugen Wintersberger
Appending a static dataframe to a stream create Parquet file fails
Hi all, I recently stumbled about a rather strange problem with streaming sources in one of my tests. I am writing a Parquet file from a streaming source and subsequently try to append the same data but this time from a static dataframe. Surprisingly, the number of rows in the Parquet file remains the same after the append operation. Here is the relevant code "Appending data from static dataframe" must "produce twice as much data" in { logLinesStream.writeStream .format("parquet") .option("path", path.toString) .outputMode("append") .start() .processAllAvailable() spark.read.format("parquet").load(path.toString).count mustBe 1159 logLinesDF.write.format("parquet").mode("append").save(path.toString) spark.read.format("parquet").load(path.toString).count mustBe 2*1159 } Does anyone have an idea what I am doing wrong here? thanks in advance Eugen Wintersberger
Re: Append to an existing Delta Lake using structured streaming
I will try to provide a stripped down example of what I am doing The initial delta lake is built from a dataframe like this from within a notebook val hourly_new = events.select(window('timestamp, "1 hour"), 'serial, 'type) .select($"window.start".as("start"), 'serialno, 'eventType) .withWatermark("start", "70 minutes") .groupBy("start", "serial", "type") .count() .withColumn("year", year(col("start"))) .withColumn("month", month(col("start"))) // COMMAND -- hourly_new.write.format("delta").partitionBy("year","month").save("/pat h/to/delta-lake") Once this data has been written to disk I would like to append to it using a streaming task val trigger = Trigger.ProcessingTime(1) df.writeStream .format(format) .option("path", "/path/to/delta-lake") .option("checkpointLocation", "/path/to/checkpoint") .outputMode(OutputMode.Append()) .option("mergeSchema","true") .trigger(trigger) .partitionBy("year", "month") .start() When I start the streaming job, the delta lake referenced never gets any updates. The number of rows remain as they were after the notebook code above. Interestingly, when I create a new file with the streaming code above and then restart the task the delta lake is happly updated every hour. What I do not understand is what I do wrong in the notebook code so that the streaming task is unable to append the data to the delta lake. Thanks in advance and best regards Eugen On Wed, 2021-07-21 at 19:51 +, Denny Lee wrote: > Including the Delta Lake Users and Developers DL to help out. > > Saying this, could you clarify how data is not being added? By any > chance do you have any code samples to recreate this? > > > Sent via Superhuman > > > On Wed, Jul 21, 2021 at 2:49 AM, > wrote: > > Hi all, > > I stumbled upon an interessting problem. I have an existing > > Deltalake with data recovered from a backup and would like to > > append to this Deltalake using Spark structured streaming. This > > does not work. Although the streaming job is running no data is > > appended. > > If I created the original file with structured streaming than > > appending to this file with a streaming job (at least with the same > > job) works flawlessly. Did I missunderstand something here? > > > > best regards > > Eugen Wintersberger >
Append to an existing Delta Lake using structured streaming
Hi all, I stumbled upon an interessting problem. I have an existing Deltalake with data recovered from a backup and would like to append to this Deltalake using Spark structured streaming. This does not work. Although the streaming job is running no data is appended. If I created the original file with structured streaming than appending to this file with a streaming job (at least with the same job) works flawlessly. Did I missunderstand something here? best regards Eugen Wintersberger
Kafka structured straming - how to read headers
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this: val stream = sparkSession.readStream.format("kafka")..load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") }) My question is: what would be HeaderT? Thanks in advance Eugen
Fitting only the intercept for LinearRegression
Hi, I was wondering if it would be possible to fit only the intercept on a LinearRegression instance by providing a known coefficient? Here is some background information: we have a problem where linear regression is well suited as a predictor. However, the model requires continuous adoption. During an initial training, the coefficient and the intercept of the linear model are determined from a given set of training data. Later, this model requires adoption during which the intercept has to be adopted to a new set of training data (the coefficient, in other words the slope, remains the same as obtained from the initial model). I had a look on the Java API for LinearRegression and could not find a way how to only fit the intercept and set initial parameters for a fit. Am I missing something? Is there a way how to to do this with the LinearRegression class in Sparks' ML package or do I have to use a different approach? Thanks in advance. regards Eugen signature.asc Description: This is a digitally signed message part