wenbinwei created SPARK-23678:
---------------------------------

             Summary: a more efficient partition strategy
                 Key: SPARK-23678
                 URL: https://issues.apache.org/jira/browse/SPARK-23678
             Project: Spark
          Issue Type: New Feature
          Components: GraphX
    Affects Versions: 2.4.0
            Reporter: wenbinwei


Recently, I found a new partition strategy (call EdgePartitionTriangle), which 
is a combination of the partition strategy EdgePartition2D and the the 
partition strategy CanonicalRandomVertexCut. This partition strategy has three 
advantages:
 1. nicer bound on vertex replication, sqrt(2 * numParts).
 2. colocate all edges between two vertices regardless of direction.
 3. same work balance compared with EdgePartition2D

See 
[https://github.com/weiwee/edgePartitionTri/blob/master/EdgePartitionTriangle.ipynb]

The main idea is to virtually partitioned by EdgePartition2D, gets partitions

{(i,j)|i=1,2,..,k, j=1,2,..,k}

. Then relocate partitions by folding the virtual partitions, such as:

(1,0) and (0,1) -> (1, 0)

(2,1) and (1,2) -> (2, 1)

...

(k, k-1) and (k-1, k) -> (k, k -1)

 
 Finally, maps \{(1,0), (2,0), (2,1), (3,0),(3,1),(3,2),...,(k, k-1)} to 
\{0,1,...,k*(k-1) / 2}

The complete method needs to handle more details:

1.  when numParts is not a triangle number, partitions are divided into two 
types: triangleParts and rests. The later one is partitioned by a different 
strategy.

2. when edges are virtually located to partition (a, a), Then they should be 
relocated to partition

{(a, 0), (a, 1),..., (a, a-1), (a+1, a),...,(k, a)}

to achieve better work balance.

codes: 
{code:java}
object EdgePartitionTriangle extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
    val mixingPrime: VertexId = 1125899906842597L
    val numRowTriParts = ((math.sqrt(1 + 8 * numParts) - 1) / 2).toInt
    val numTriParts = numRowTriParts * (numRowTriParts + 1) / 2
    val segmentFactor = 100 // positive even numbers
    val numSegments = (segmentFactor * math.sqrt(4 * numParts * 
numTriParts)).toInt
    val segRow = (math.abs(src * mixingPrime) % numSegments).toInt
    val segCol = (math.abs(dst * mixingPrime) % numSegments).toInt
    var row = segRow / (segmentFactor * numRowTriParts)
    var col = segCol / (segmentFactor * numRowTriParts)
    if (math.max(segRow, segCol) >= 2 * segmentFactor * numTriParts) {
      row = numRowTriParts + 1
      col = math.min(segRow, segCol) % (numParts - numTriParts)
    }
    else if (row == col) {
      val ind = math.min(segRow % numRowTriParts, segCol % numRowTriParts)
      col = (math.min(2 * numRowTriParts - ind - 1, ind) + row + 1) % 
(numRowTriParts + 1)
    }
    if (row > col) row * (row - 1) / 2 + col else col * (col - 1) / 2 + row
  }
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to