Hi Jorn,

We are using Spark 2.2.0 for our development.
Below is the code snippet for your reference:

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
newDf.write.format("parquet").saveAsTable("tempData")
newDf.coalesce(1).write.format(outputFormat).option("header", 
"true").save(hdfsUri + destFilePath)

var groupedData = newDf.rdd.map { x => (x.get(0),x)}.groupByKey();

     //Get schema fields of dataframe
     var structFieldArray:Array[StructField] = newDf.schema.fields
     //Create Map for storing Dataframe's columnName,ColumnNumber and their 
dataType
     var i=0
     val cache = collection.mutable.Map[String, DataFrameBO]()
     for(structField<-structFieldArray)
     {
          val dataFrameBO = new 
DataFrameBO(i,structField.name,structField.dataType.typeName)
          cache.put(structField.name, dataFrameBO)
          i = i + 1
     }
     var dfWithoutDuplicateRows = groupedData.mapValues { x => {
              var ls:List[Row]=List()
              var linkedMap = collection.mutable.Map[String, String]()
              val linkedSid =ArrayBuffer.empty[String]


              x.foreach { y => {
                             var subpathId = y(cache(sid).columnNumber)
                             var salesTimeColumn = y(cache(time).columnNumber)
                             var orderId = 
y(cache(orderIdColumnName).columnNumber)

                             var seq = ArrayBuffer[Any]()
                             for(i <- 0 to (y.size - 2)){
                                  seq += y(i)
                             }

                                                                          
if(!linkedSid.contains(y(cache(sid).columnNumber)))
                              {
                                                                              
if(linkedMap.exists(x => x._1.equals(y(cache(time).columnNumber)) && 
x._2.equals(y(cache(orderIdColumnName).columnNumber))))
                                  {
                                     seq += 0 // Appends 0 to rows which needs 
to be deleted.
                                  }
                                  else
                                  {
                                      linkedSid += 
y(cache(sid).columnNumber).toString()
                                      
linkedMap.put(y(cache(time).columnNumber).toString(),y(cache(orderIdColumnName).columnNumber).toString())
                                      seq += 1  // Appends 1 to rows which need 
not be deleted.
                                  }
                               }
                              else
                              {
                                   seq += 0 // Appends 0 to rows which needs to 
be deleted.
                              }

                 ls::= Row.fromSeq(seq)
              }}
         ls
      }}

      var flatDataframe = dfWithoutDuplicateRows.values.flatMap { x => {x} }
      var finalDF = data.sqlContext.createDataFrame(flatDataframe, newDf.schema)

finalDF should have picked up data on first cum first basis and updated the 
flag accordingly.
Please let me know if you need any other information regarding the same.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, June 4, 2018 10:59 PM
To: Jain, Neha T. <neha.t.j...@accenture.com<mailto:neha.t.j...@accenture.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal 
<payal.pa...@accenture.com<mailto:payal.pa...@accenture.com>>; Sing, Jasbir 
<jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>>
Subject: Re: [External] Re: Sorting in Spark on multiple partitions

I think also there is a misunderstanding how repartition works. It keeps the 
existing number of partitions, but hash partitions according to userid. Means 
in each partition it is likely to have different user ids.

That would also explain your observed behavior. However without having the full 
source code these are just assumptions.

On 4. Jun 2018, at 17:33, Jain, Neha T. 
<neha.t.j...@accenture.com<mailto:neha.t.j...@accenture.com>> wrote:
Hi Jorn,

I tried removing userid from my sort clause but still the same issue- data not 
sorted.

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)

I am checking the sorting results  by temporary writing this file to Hive as 
well as HDFS. Now, when I see the user wise data it is not sorted.
Attaching the output file for your reference.

On the basis of sorting within userid partitions, I want to add a flag which 
marks first item in the partition as true other items in that partition as 
false.
If my sorting order is disturbed, the flag is wrongly set.

Please suggest what else could be done to fix this very basic scenario of 
sorting in Spark across multiple partitions across multiple nodes.

Thanks & Regards,
Neha Jain

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, June 4, 2018 10:48 AM
To: Sing, Jasbir <jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal 
<payal.pa...@accenture.com<mailto:payal.pa...@accenture.com>>; Jain, Neha T. 
<neha.t.j...@accenture.com<mailto:neha.t.j...@accenture.com>>
Subject: [External] Re: Sorting in Spark on multiple partitions

You partition by userid, why do you then sort again by userid in the partition? 
Can you try to remove userid from the sort?

How do you check if the sort is correct or not?

What is the underlying objective of the sort? Do you have more information on 
schema and data?

On 4. Jun 2018, at 05:47, Sing, Jasbir 
<jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>> wrote:
Hi Team,

We are currently using Spark 2.2.0 and facing some challenges in sorting of 
data on multiple partitions.
We have tried below approaches:


  1.  Spark SQL approach:
a.      var query = "select * from data distribute by " + userid + " sort by " 
+ userid + ", " + time “


This query returns correct results in Hive but not in Spark SQL.

  1.  var newDf = data.repartition(col(userud)).orderBy(userid, time)
  2.  var newDf = 
data.repartition(col(userid)).sortWithinPartitions(userid,time)


But none of the above approach is giving correct results for sorting of data.
Please suggest what could be done for the same.

Thanks & Regards,
Neha Jain

________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy. Your privacy is important to us. Accenture uses your personal data only 
in compliance with data protection laws. For further information on how 
Accenture processes your personal data, please see our privacy statement at 
https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>
<test.csv>

Reply via email to