Re: Convert each partition of RDD to Dataframe

2020-02-28 Thread Manjunath Shetty H
Hi Enrico, Thanks for the suggestion, i wanted to know if there are any performance implications of running multi-threaded driver ? If i create multiple Dataframes in parallel, then Spark will schedule those jobs in parallel ? Thanks Manjunath From: Enrico

Fwd: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Hi Tathagata. I tried making changes as you suggested: @SQLUserDefinedType(udt = classOf[JodaTimeUDT]) class JodaTimeUDT extends UserDefinedType[DateTime] { override def sqlType: DataType = TimestampType override def serialize(obj: DateTime): Long = { obj.getMillis } def

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Perfect. I'll give this a shot and report back. Get Outlook for Android From: Tathagata Das Sent: Friday, February 28, 2020 6:23:07 PM To: Bryan Jeffrey Cc: user Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala The docs says that 1. this is deprecated and therefore should not be

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Tathagata, The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations). We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt

Aggregating values by a key field in Spark Streaming

2020-02-28 Thread Something Something
Here's my use case: Messages are coming into a Kafka Topic for different 'Events'. Each event has a unique Event Id. I need to aggregate counts for each Event AFTER the event is completed. For now, we are thinking we can assume an event is completed if there are no more messages coming in for a

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably

Pyspark Convert Struct Type to Map Type

2020-02-28 Thread anbutech
Hello Sir, Could you please advise the below scenario in pyspark 2.4.3 in data-bricks to load the data into the delta table. I want to load the dataframe with this column "data" into the table as Map type in the data-bricks spark delta table.could you please advise on this scenario.how to

Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Bryan Jeffrey
Hello. I'm running Scala 2.11 w/ Spark 2.3.0. I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight. We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda. This works well in most dataset/dataframe structured

Re: Compute the Hash of each row in new column

2020-02-28 Thread Enrico Minack
This computes the md5 hash of a given column id of Dataset ds: ds.withColumn("id hash", md5($"id")).show(false) Test with this Dataset ds: import org.apache.spark.sql.types._ val ds = spark.range(10).select($"id".cast(StringType)) Available are md5, sha, sha1, sha2 and hash:

Re: Compute the Hash of each row in new column

2020-02-28 Thread Riccardo Ferrari
Hi Chetan, Would the sql function `hash` do the trick for your use-case ? Best, On Fri, Feb 28, 2020 at 1:56 PM Chetan Khatri wrote: > Hi Spark Users, > How can I compute Hash of each row and store in new column at Dataframe, > could someone help me. > > Thanks >

Compute the Hash of each row in new column

2020-02-28 Thread Chetan Khatri
Hi Spark Users, How can I compute Hash of each row and store in new column at Dataframe, could someone help me. Thanks

Re: dropDuplicates and watermark in structured streaming

2020-02-28 Thread Tathagata Das
why do you have two watermarks? once you apply the watermark to a column (i.e., "time"), it can be used in all later operations as long as the column is preserved. So the above code should be equivalent to df.withWarmark("time","window size").dropDulplicates("id").groupBy(window("time","window