Re: [pyspark 2.4+] BucketBy SortBy doesn't retain sort order
Hi All, Just checking in to see if anyone has any advice on this. Thanks, Rishi On Mon, Mar 2, 2020 at 9:21 PM Rishi Shah wrote: > Hi All, > > I have 2 large tables (~1TB), I used the following to save both the > tables. Then when I try to join both tables with join_column, it still does > shuffle & sort before the join. Could someone please help? > > df.repartition(2000).write.bucketBy(1, > join_column).sortBy(join_column).saveAsTable(tablename) > > -- > Regards, > > Rishi Shah > -- Regards, Rishi Shah
Stateful Spark Streaming: Required attribute 'value' not found
In a Stateful Spark Streaming application I am writing the 'OutputRow' in the 'updateAcrossEvents' but I keep getting this error (*Required attribute 'value' not found*) while it's trying to write to Kafka. I know from the documentation that 'value' attribute needs to be set but how do I do that in the 'Stateful Structured Streaming'? Where & how do I add this 'value' attribute in the following code? *Note: I am using Spark 2.3.1* withEventTime .as[R00tJsonObject] .withWatermark("event_time", "5 minutes") .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId)) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "myTopic") .option("checkpointLocation", "/Users/username/checkpointLocation") .outputMode("update") .start() .awaitTermination()
Example of Stateful Spark Structured Streaming with Kafka
There are lots of examples on 'Stateful Structured Streaming' in 'The Definitive Guide' book BUT all of them read JSON from a 'path'. That's working for me. Now I need to read from Kafka. I Googled but I couldn't find any example. I am struggling to Map the 'Value' of the Kafka message to my JSON. Any help would be appreciated. Here's what I am trying: val query = withEventTime .as[R00tJsonObject] .withWatermark("event_time", "5 minutes") .groupByKey(row => (row.report.id, row.report.time.toString, row.report.cId)) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", *"myTopic"*) .option("checkpointLocation", "/Users/username/checkpointLocation") .outputMode("update") .start().awaitTermination cannot resolve 'arrivalTime' given input columns: [value, event_time];
Re: How to collect Spark dataframe write metrics
Hi, to get DataFrame level write metrics you can take a look at the following trait : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala and a basic implementation example: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala and here is an example of how it is being used in FileStreamSink: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L178 - about the good practise - it depends on your use case but Generally speaking I would not do it - at least not for checking your logic/ checking spark is working correctly. בתאריך יום א׳, 1 במרץ 2020 ב-14:32 מאת Manjunath Shetty H < manjunathshe...@live.com>: > Hi all, > > Basically my use case is to validate the DataFrame rows count before and > after writing to HDFS. Is this even to good practice ? Or Should relay on > spark for guaranteed writes ?. > > If it is a good practice to follow then how to get the DataFrame level > write metrics ? > > Any pointers would be helpful. > > > Thanks and Regards > Manjunath >