Hi Jorn, We are using Spark 2.2.0 for our development. Below is the code snippet for your reference:
var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time) newDf.write.format("parquet").saveAsTable("tempData") newDf.coalesce(1).write.format(outputFormat).option("header", "true").save(hdfsUri + destFilePath) var groupedData = newDf.rdd.map { x => (x.get(0),x)}.groupByKey(); //Get schema fields of dataframe var structFieldArray:Array[StructField] = newDf.schema.fields //Create Map for storing Dataframe's columnName,ColumnNumber and their dataType var i=0 val cache = collection.mutable.Map[String, DataFrameBO]() for(structField<-structFieldArray) { val dataFrameBO = new DataFrameBO(i,structField.name,structField.dataType.typeName) cache.put(structField.name, dataFrameBO) i = i + 1 } var dfWithoutDuplicateRows = groupedData.mapValues { x => { var ls:List[Row]=List() var linkedMap = collection.mutable.Map[String, String]() val linkedSid =ArrayBuffer.empty[String] x.foreach { y => { var subpathId = y(cache(sid).columnNumber) var salesTimeColumn = y(cache(time).columnNumber) var orderId = y(cache(orderIdColumnName).columnNumber) var seq = ArrayBuffer[Any]() for(i <- 0 to (y.size - 2)){ seq += y(i) } if(!linkedSid.contains(y(cache(sid).columnNumber))) { if(linkedMap.exists(x => x._1.equals(y(cache(time).columnNumber)) && x._2.equals(y(cache(orderIdColumnName).columnNumber)))) { seq += 0 // Appends 0 to rows which needs to be deleted. } else { linkedSid += y(cache(sid).columnNumber).toString() linkedMap.put(y(cache(time).columnNumber).toString(),y(cache(orderIdColumnName).columnNumber).toString()) seq += 1 // Appends 1 to rows which need not be deleted. } } else { seq += 0 // Appends 0 to rows which needs to be deleted. } ls::= Row.fromSeq(seq) }} ls }} var flatDataframe = dfWithoutDuplicateRows.values.flatMap { x => {x} } var finalDF = data.sqlContext.createDataFrame(flatDataframe, newDf.schema) finalDF should have picked up data on first cum first basis and updated the flag accordingly. Please let me know if you need any other information regarding the same. From: Jörn Franke [mailto:jornfra...@gmail.com] Sent: Monday, June 4, 2018 10:59 PM To: Jain, Neha T. <neha.t.j...@accenture.com<mailto:neha.t.j...@accenture.com>> Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal <payal.pa...@accenture.com<mailto:payal.pa...@accenture.com>>; Sing, Jasbir <jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>> Subject: Re: [External] Re: Sorting in Spark on multiple partitions I think also there is a misunderstanding how repartition works. It keeps the existing number of partitions, but hash partitions according to userid. Means in each partition it is likely to have different user ids. That would also explain your observed behavior. However without having the full source code these are just assumptions. On 4. Jun 2018, at 17:33, Jain, Neha T. <neha.t.j...@accenture.com<mailto:neha.t.j...@accenture.com>> wrote: Hi Jorn, I tried removing userid from my sort clause but still the same issue- data not sorted. var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time) I am checking the sorting results by temporary writing this file to Hive as well as HDFS. Now, when I see the user wise data it is not sorted. Attaching the output file for your reference. On the basis of sorting within userid partitions, I want to add a flag which marks first item in the partition as true other items in that partition as false. If my sorting order is disturbed, the flag is wrongly set. Please suggest what else could be done to fix this very basic scenario of sorting in Spark across multiple partitions across multiple nodes. Thanks & Regards, Neha Jain From: Jörn Franke [mailto:jornfra...@gmail.com] Sent: Monday, June 4, 2018 10:48 AM To: Sing, Jasbir <jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>> Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal <payal.pa...@accenture.com<mailto:payal.pa...@accenture.com>>; Jain, Neha T. <neha.t.j...@accenture.com<mailto:neha.t.j...@accenture.com>> Subject: [External] Re: Sorting in Spark on multiple partitions You partition by userid, why do you then sort again by userid in the partition? Can you try to remove userid from the sort? How do you check if the sort is correct or not? What is the underlying objective of the sort? Do you have more information on schema and data? On 4. Jun 2018, at 05:47, Sing, Jasbir <jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>> wrote: Hi Team, We are currently using Spark 2.2.0 and facing some challenges in sorting of data on multiple partitions. We have tried below approaches: 1. Spark SQL approach: a. var query = "select * from data distribute by " + userid + " sort by " + userid + ", " + time “ This query returns correct results in Hive but not in Spark SQL. 1. var newDf = data.repartition(col(userud)).orderBy(userid, time) 2. var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time) But none of the above approach is giving correct results for sorting of data. Please suggest what could be done for the same. Thanks & Regards, Neha Jain ________________________________ This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy. ______________________________________________________________________________________ www.accenture.com<http://www.accenture.com> <test.csv>