RE: Why is Spark 3.0.x faster than Spark 3.1.x

2021-05-17 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Maziyar, Mich

Do we have any ticket to track this? Any idea if this is going to be fixed in 
3.1.2?

Thanks and Regards,
Abhishek

From: Mich Talebzadeh 
Sent: Friday, April 9, 2021 2:11 PM
To: Maziyar Panahi 
Cc: User 
Subject: Re: Why is Spark 3.0.x faster than Spark 3.1.x


Hi,

Regarding your point:

 I won't be able to defend this request by telling Spark users the previous 
major release was and still is more stable than the latest major release ...

With the benefit of hindsight version 3.1.1 was released recently and the 
definition of stable (from a practical point of view) does not come into it 
yet. That is perhaps the reason why some vendors like Cloudera are few releases 
away from the latest version. In production what matters most is the 
predictability and stability. You are not doing anything wrong by rolling it 
back and awaiting further clarification and resolution on the error.

HTH


[https://docs.google.com/uc?export=download&id=1qt8nKd2bxgs6clwYFqGy-k84L3N79hW6&revid=0B1BiUVX33unjallLZWQwN1BDbGRMNTI5WUw3TlloMmJZRThjPQ]


 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 9 Apr 2021 at 08:58, Maziyar Panahi 
mailto:maziyar.pan...@iscpif.fr>> wrote:
Thanks Mich, I will ask all of our users to use pyspark 3.0.x and will change 
all the notebooks/scripts to switch back from 3.1.1 to 3.0.2.

That's being said, I won't be able to defend this request by telling Spark 
users the previous major release was and still is more stable than the latest 
major release, something that made everything default to 3.1.1 (pyspark, 
downloads, etc.).

I'll see if I can open a ticket for this as well.


On 8 Apr 2021, at 17:27, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

Well the normal course of action (considering laws of diminishing returns)  is 
that your mileage varies:

Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding 
reason why you have to use 3.1.1, you can set it aside and try it when you have 
other use cases. For now I guess you can carry on with 3.0.1 as BAU.

HTH


 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 8 Apr 2021 at 16:19, Maziyar Panahi 
mailto:maziyar.pan...@iscpif.fr>> wrote:
I personally added the followings to my SparkSession in 3.1.1 and the result 
was exactly the same as before (local master). The 3.1.1 is still 4-5 times 
slower than 3.0.2 at least for that piece of code. I will do more investigation 
to see how it does with other stuff, especially anything without .transform or 
Spark ML related functions, but the small code I provided on any dataset that 
is big enough to take a minute to finish will show you the difference going 
from 3.0.2 to 3.1.1 by magnitude of 4-5:


.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.adaptive.enabled", "false")



On 8 Apr 2021, at 16:47, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the 
performance is worse

I assume it adds some overhead?



 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 8 Apr 2021 at 15:05, Maziyar Panahi 
mailto:maziyar.pan...@iscpif.fr>> wrote:
Thanks Sean,

I h

Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
On Mon, May 17, 2021 at 2:31 PM Lalwani, Jayesh  wrote:
>
> If the UDFs are computationally expensive, I wouldn't solve this problem with 
>  UDFs at all. If they are working in an iterative manner, and assuming each 
> iteration is independent of other iterations (yes, I know that's a big 
> assumptiuon), I would think about exploding your dataframe to have a row per 
> iteration, and working on each row separately, and then aggregating in the 
> end. This allows you to scale your computation much better.

Ah, in this case, I mean "iterative" in the sense of the
"code/run/examine" sense of the word, not that the UDF itself is
performing an iterative computation.

>
> I know not all computations can be map-reducable like that. However, most can.
>
> Split and merge data workflows in Spark don't work like their DAG 
> representations, unless you add costly caches. Without caching, each split 
> will result in Spark rereading data from the source, even if the splits are 
> getting merged together. The only way to avoid it is by caching at the split 
> point, which depending on the amount of data can become costly. Also, joins 
> result in shuffles. Avoiding splits and merges is better.
>
> To give you an example, we had an application that applied a series of rules 
> to rows. The output required was a dataframe with an additional column that 
> indicated which rule the row satisfied. In our initial implementation, we had 
> a series of r one per rule. For N rules, we created N dataframes that had the 
> rows that satisfied the rules. The we unioned the N data frames. Horrible 
> performance that didn't scale with N. We reimplemented to add N Boolean 
> columns; one per rule; that indicated if the rule was satisfied. We just kept 
> adding the boolen columns to the dataframe. After iterating over the rules, 
> we added another column that indicated out which rule was satisfied, and then 
> dropped the Boolean columns. Much better performance that scaled with N. 
> Spark read from datasource just once, and since there were no joins/unions, 
> there was no shuffle

The hitch in your example, and what we're trying to avoid, is that if
you need to change one of these boolean columns, you end up needing to
recompute everything "afterwards" in the DAG (AFAICT), even if the
"latter" stages don't have a true dependency on the changed column. We
do explorations of very large physics datasets, and one of the
disadvantages of our bespoke analysis software is that any change to
the analysis code involves re-computing everything from scratch. A big
goal of mine is to make it so that what was changed is recomputed, and
no more, which will speed up the rate at which we can find new
physics.

Cheers
Andrew

>
> On 5/17/21, 2:56 PM, "Andrew Melo"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and know 
> the content is safe.
>
>
>
> In our case, these UDFs are quite expensive and worked on in an
> iterative manner, so being able to cache the two "sides" of the graphs
> independently will speed up the development cycle. Otherwise, if you
> modify foo() here, then you have to recompute bar and baz, even though
> they're unchanged.
>
> df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', 
> baz('x'))
>
> Additionally, a longer goal would be to be able to persist/cache these
> columns to disk so a downstream user could later mix and match several
> (10s) of these columns together as their inputs w/o having to
> explicitly compute them themselves.
>
> Cheers
> Andrew
>
> On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
> >
> > Why join here - just add two columns to the DataFrame directly?
> >
> > On Mon, May 17, 2021 at 1:04 PM Andrew Melo  
> wrote:
> >>
> >> Anyone have ideas about the below Q?
> >>
> >> It seems to me that given that "diamond" DAG, that spark could see
> >> that the rows haven't been shuffled/filtered, it could do some type of
> >> "zip join" to push them together, but I've not been able to get a plan
> >> that doesn't do a hash/sort merge join
> >>
> >> Cheers
> >> Andrew
> >>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Lalwani, Jayesh
If the UDFs are computationally expensive, I wouldn't solve this problem with  
UDFs at all. If they are working in an iterative manner, and assuming each 
iteration is independent of other iterations (yes, I know that's a big 
assumptiuon), I would think about exploding your dataframe to have a row per 
iteration, and working on each row separately, and then aggregating in the end. 
This allows you to scale your computation much better. 

I know not all computations can be map-reducable like that. However, most can. 

Split and merge data workflows in Spark don't work like their DAG 
representations, unless you add costly caches. Without caching, each split will 
result in Spark rereading data from the source, even if the splits are getting 
merged together. The only way to avoid it is by caching at the split point, 
which depending on the amount of data can become costly. Also, joins result in 
shuffles. Avoiding splits and merges is better.

To give you an example, we had an application that applied a series of rules to 
rows. The output required was a dataframe with an additional column that 
indicated which rule the row satisfied. In our initial implementation, we had a 
series of r one per rule. For N rules, we created N dataframes that had the 
rows that satisfied the rules. The we unioned the N data frames. Horrible 
performance that didn't scale with N. We reimplemented to add N Boolean 
columns; one per rule; that indicated if the rule was satisfied. We just kept 
adding the boolen columns to the dataframe. After iterating over the rules, we 
added another column that indicated out which rule was satisfied, and then 
dropped the Boolean columns. Much better performance that scaled with N. Spark 
read from datasource just once, and since there were no joins/unions, there was 
no shuffle

On 5/17/21, 2:56 PM, "Andrew Melo"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



In our case, these UDFs are quite expensive and worked on in an
iterative manner, so being able to cache the two "sides" of the graphs
independently will speed up the development cycle. Otherwise, if you
modify foo() here, then you have to recompute bar and baz, even though
they're unchanged.

df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', 
baz('x'))

Additionally, a longer goal would be to be able to persist/cache these
columns to disk so a downstream user could later mix and match several
(10s) of these columns together as their inputs w/o having to
explicitly compute them themselves.

Cheers
Andrew

On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
>
> Why join here - just add two columns to the DataFrame directly?
>
> On Mon, May 17, 2021 at 1:04 PM Andrew Melo  wrote:
>>
>> Anyone have ideas about the below Q?
>>
>> It seems to me that given that "diamond" DAG, that spark could see
>> that the rows haven't been shuffled/filtered, it could do some type of
>> "zip join" to push them together, but I've not been able to get a plan
>> that doesn't do a hash/sort merge join
>>
>> Cheers
>> Andrew
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
In our case, these UDFs are quite expensive and worked on in an
iterative manner, so being able to cache the two "sides" of the graphs
independently will speed up the development cycle. Otherwise, if you
modify foo() here, then you have to recompute bar and baz, even though
they're unchanged.

df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', baz('x'))

Additionally, a longer goal would be to be able to persist/cache these
columns to disk so a downstream user could later mix and match several
(10s) of these columns together as their inputs w/o having to
explicitly compute them themselves.

Cheers
Andrew

On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
>
> Why join here - just add two columns to the DataFrame directly?
>
> On Mon, May 17, 2021 at 1:04 PM Andrew Melo  wrote:
>>
>> Anyone have ideas about the below Q?
>>
>> It seems to me that given that "diamond" DAG, that spark could see
>> that the rows haven't been shuffled/filtered, it could do some type of
>> "zip join" to push them together, but I've not been able to get a plan
>> that doesn't do a hash/sort merge join
>>
>> Cheers
>> Andrew
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Sean Owen
Why join here - just add two columns to the DataFrame directly?

On Mon, May 17, 2021 at 1:04 PM Andrew Melo  wrote:

> Anyone have ideas about the below Q?
>
> It seems to me that given that "diamond" DAG, that spark could see
> that the rows haven't been shuffled/filtered, it could do some type of
> "zip join" to push them together, but I've not been able to get a plan
> that doesn't do a hash/sort merge join
>
> Cheers
> Andrew
>
>


Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
Anyone have ideas about the below Q?

It seems to me that given that "diamond" DAG, that spark could see
that the rows haven't been shuffled/filtered, it could do some type of
"zip join" to push them together, but I've not been able to get a plan
that doesn't do a hash/sort merge join

Cheers
Andrew

On Wed, May 12, 2021 at 11:32 AM Andrew Melo  wrote:
>
> Hi,
>
> In the case where the left and right hand side share a common parent like:
>
> df = spark.read.someDataframe().withColumn('rownum', row_number())
> df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
> df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
> df_joined = df1.join(df2, 'rownum', 'inner')
>
> (or maybe replacing row_number() with monotonically_increasing_id())
>
> Is there some hint/optimization that can be done to let Spark know
> that the left and right hand-sides of the join share the same
> ordering, and a sort/hash merge doesn't need to be done?
>
> Thanks
> Andrew
>
> On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
> >
> > Yeah I don't think that's going to work - you aren't guaranteed to get 1, 
> > 2, 3, etc. I think row_number() might be what you need to generate a join 
> > ID.
> >
> > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You 
> > could .zip two RDDs you get from DataFrames and manually convert the Rows 
> > back to a single Row and back to DataFrame.
> >
> >
> > On Wed, May 12, 2021 at 10:47 AM kushagra deep  
> > wrote:
> >>
> >> Thanks Raghvendra
> >>
> >> Will the ids for corresponding columns  be same always ? Since 
> >> monotonic_increasing_id() returns a number based on partitionId and the 
> >> row number of the partition  ,will it be same for corresponding columns? 
> >> Also is it guaranteed that the two dataframes will be divided into logical 
> >> spark partitions with the same cardinality for each partition ?
> >>
> >> Reg,
> >> Kushagra Deep
> >>
> >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh  
> >> wrote:
> >>>
> >>> You can add an extra id column and perform an inner join.
> >>>
> >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
> >>>
> >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
> >>>
> >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
> >>>
> >>> +-+-+
> >>>
> >>> |amount_6m|amount_9m|
> >>>
> >>> +-+-+
> >>>
> >>> |  100|  500|
> >>>
> >>> |  200|  600|
> >>>
> >>> |  300|  700|
> >>>
> >>> |  400|  800|
> >>>
> >>> |  500|  900|
> >>>
> >>> +-+-+
> >>>
> >>>
> >>> --
> >>> Raghavendra
> >>>
> >>>
> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep  
> >>> wrote:
> 
>  Hi All,
> 
>  I have two dataframes
> 
>  df1
> 
>  amount_6m
>   100
>   200
>   300
>   400
>   500
> 
>  And a second data df2 below
> 
>   amount_9m
>    500
>    600
>    700
>    800
>    900
> 
>  The number of rows is same in both dataframes.
> 
>  Can I merge the two dataframes to achieve below df
> 
>  df3
> 
>  amount_6m | amount_9m
>  100   500
>   200  600
>   300  700
>   400  800
>   500  900
> 
>  Thanks in advance
> 
>  Reg,
>  Kushagra Deep
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark History Server to S3 doesn't show up incomplete jobs

2021-05-17 Thread Tianbin Jiang
Hi all,
 I am using Spark 2.4.5.  I am redirecting the spark event logs to a S3
with the following configuration:

spark.eventLog.enabled = true
spark.history.ui.port = 18080
spark.eventLog.dir = s3://livy-spark-log/spark-history/
spark.history.fs.logDirectory = s3://livy-spark-log/spark-history/
spark.history.fs.update.interval = 5s


Once my application is completed, I can see it shows up on the spark
history server. However, running applications doesn't show up on
"incomplete applications". I have also checked the log, whenever my
application end, I can see this message:

21/05/17 06:14:18 INFO k8s.KubernetesClusterSchedulerBackend: Shutting down
all executors
21/05/17 06:14:18 INFO
k8s.KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each
executor to shut down
21/05/17 06:14:18 WARN k8s.ExecutorPodsWatchSnapshotSource: Kubernetes
client has been closed (this is expected if the application is shutting
down.)
*21/05/17 06:14:18 INFO s3n.MultipartUploadOutputStream: close closed:false
s3://livy-spark-log/spark-history/spark-48c3141875fe4c67b5708400134ea3d6.inprogress*
*21/05/17 06:14:19 INFO s3n.S3NativeFileSystem: rename
s3://livy-spark-log/spark-history/spark-48c3141875fe4c67b5708400134ea3d6.inprogress
s3://livy-spark-log/spark-history/spark-48c3141875fe4c67b5708400134ea3d6*
21/05/17 06:14:19 INFO spark.MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
21/05/17 06:14:19 INFO memory.MemoryStore: MemoryStore cleared
21/05/17 06:14:19 INFO storage.BlockManager: BlockManager stopped


I am not able to see any xx.inprogress file on S3 though. Anyone had this
problem before?

-- 

Sincerely:
 Tianbin Jiang


Re: Calculate average from Spark stream

2021-05-17 Thread Mich Talebzadeh
Hi Giuseppe ,

How have you defined your resultM above in qK?

Cheers



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 May 2021 at 17:18, Giuseppe Ricci  wrote:

> Hi Mitch,
>
> thanks for your extraordinary support.
> Your previous code worked well...but I received the error in my past mail
> for Kafka writing average temperature on the topic avgtemperature.
> There is some error in this code:
>
> qk = (resultM.
>   selectExpr("CAST(timestamp AS STRING)", "CAST(temperature AS
> STRING)") \
>   .writeStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option('checkpointLocation',
> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
> \
>   .option("topic", "avgtemperature") \
>   .start())
>
> I hope it is clear.
> Thanks.
>
>
>
> PhD. Giuseppe Ricci
>
>
>
> Il giorno lun 17 mag 2021 alle ore 16:33 Mich Talebzadeh <
> mich.talebza...@gmail.com> ha scritto:
>
>> Hi Giuseppe,
>>
>> Your error state --> Required attribute 'value' not found
>>
>> First can you read your streaming data OK?
>>
>> Here in my stream in data format in json. I have three columns in json
>> format
>>
>> example:
>>
>> {"rowkey":"f0577406-a7d3-4c52-9998-63835ea72133",
>> "timestamp":"2021-05-17T15:17:27", "temperature":27}
>>
>> The first column is UUID, the second is timestamp and third is
>> temperature.
>>
>> I need to tell SSS how the columns are formatted
>>
>> I define the schema as follows:
>>
>>  schema = StructType().add("rowkey",
>> StringType()).add("timestamp", TimestampType()).add("temperature",
>> IntegerType())
>>checkpoint_path = "file:///ssd/hduser/temperature2/chkpt"
>> try:
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", "temperature") \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> *.select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))  ## note the value here*
>>
>> ## get the individual columns from schema
>> resultM = streamingDataFrame.select( \
>>  col("parsed_value.rowkey").alias("rowkey") \
>>, col("parsed_value.timestamp").alias("timestamp") \
>>, col("parsed_value.temperature").alias("temperature"))
>>
>> ## Here I do my windowing and tell that I am interested in
>> avg("temperature") over timestamp
>>
>> result = resultM. \
>>  withWatermark("timestamp", "5 minutes"). \
>>  groupBy(window(resultM.timestamp, "5 minutes", "5
>> minutes")). \
>>  avg('temperature'). \
>>  writeStream. \
>>  outputMode('complete'). \
>>  option("numRows", 1000). \
>>  option("truncate", "false"). \
>>  format('console'). \
>>  option('checkpointLocation', checkpoint_path). \
>>  queryName("temperature"). \
>>  start()
>>
>> except Exception as e:
>> print(f"""{e}, quitting""")
>> sys.exit(1)
>>
>> #print(result.status)
>> #print(result.recentProgress)
>> #print(result.lastProgress)
>>
>> result.awaitTermination()
>>
>>  This works. I attach the py code for you. Have a look at it
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *D

Re: Calculate average from Spark stream

2021-05-17 Thread Mich Talebzadeh
Hi Giuseppe,

Your error state --> Required attribute 'value' not found

First can you read your streaming data OK?

Here in my stream in data format in json. I have three columns in json
format

example:

{"rowkey":"f0577406-a7d3-4c52-9998-63835ea72133",
"timestamp":"2021-05-17T15:17:27", "temperature":27}

The first column is UUID, the second is timestamp and third is temperature.

I need to tell SSS how the columns are formatted

I define the schema as follows:

 schema = StructType().add("rowkey",
StringType()).add("timestamp", TimestampType()).add("temperature",
IntegerType())
   checkpoint_path = "file:///ssd/hduser/temperature2/chkpt"
try:

# construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", "temperature") \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
*.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))  ## note the value here*

## get the individual columns from schema
resultM = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

## Here I do my windowing and tell that I am interested in
avg("temperature") over timestamp

result = resultM. \
 withWatermark("timestamp", "5 minutes"). \
 groupBy(window(resultM.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature'). \
 writeStream. \
 outputMode('complete'). \
 option("numRows", 1000). \
 option("truncate", "false"). \
 format('console'). \
 option('checkpointLocation', checkpoint_path). \
 queryName("temperature"). \
 start()

except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

#print(result.status)
#print(result.recentProgress)
#print(result.lastProgress)

result.awaitTermination()

 This works. I attach the py code for you. Have a look at it

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 May 2021 at 15:00, Giuseppe Ricci  wrote:

> Hi Mich, Hi all,
>
> Thank you for your precious support..it seems your solution worked!
>
> 21/05/17 15:53:38 WARN HDFSBackedStateStoreProvider: The state for version
> 83 doesn't exist in loadedMaps. Reading snapshot file and delta files if
> needed...Note that this is normal for the first batch of starting query.
> ---
> Batch: 83
> ---
> +--+--+
> |window|avg(temperature)  |
> +--+--+
> |{2021-05-13 15:02:30, 2021-05-13 15:02:40}|11.9084741211 |
> |{2021-05-14 16:04:20, 2021-05-14 16:04:30}|12.85656677246|
> |{2021-05-13 16:04:10, 2021-05-13 16:04:20}|18.64618530273|
> |{2021-05-14 16:03:30, 2021-05-14 16:03:40}|18.54915527344|
> |{2021-05-13 16:01:10, 2021-05-13 16:01:20}|19.88389648438|
> |{2021-05-13 16:01:50, 2021-05-13 16:02:00}|16.48771118164|
> |{2021-05-14 16:02:30, 2021-05-14 16:02:40}|13.64343322754|
>
>
> I try to save data on another Kafka topic but my solution it doesn't work:
>
> qk = (resultM.
>   selectExpr("CAST(timestamp AS STRING)", "CAST(temperature AS