Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-28 Thread rohit13k
Found the exact issue. If the vertex attribute is a complex object with
mutable objects the edge triplet does not update the new state once already
the vertex attributes are shipped but if the vertex attributes are immutable
objects then there is no issue. below is a code for the same. Just changing
the mutable hashmap to immutable hashmap solves the issues. ( this is not a
fix for the bug, either this limitation should be made aware of the users
are the bug needs to be fixed for immutable objects.)

import org.apache.spark.graphx._
import com.alibaba.fastjson.JSONObject
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.HashMap


object PregelTest {
  val logger = Logger.getLogger(getClass().getName());
  def run(graph: Graph[HashMap[String, Int], HashMap[String, Int]]):
Graph[HashMap[String, Int], HashMap[String, Int]] = {

def vProg(v: VertexId, attr: HashMap[String, Int], msg: Integer):
HashMap[String, Int] = {
  var updatedAttr = attr
  
  if (msg < 0) {
// init message received 
if (v.equals(0.asInstanceOf[VertexId])) updatedAttr =
attr.+=("LENGTH" -> 0)
else updatedAttr = attr.+=("LENGTH" -> Integer.MAX_VALUE)
  } else {
updatedAttr = attr.+=("LENGTH" -> (msg + 1))
  }
  updatedAttr
}

def sendMsg(triplet: EdgeTriplet[HashMap[String, Int], HashMap[String,
Int]]): Iterator[(VertexId, Integer)] = {
  val len = triplet.srcAttr.get("LENGTH").get
  // send a msg if last hub is reachable 
  if (len < Integer.MAX_VALUE) Iterator((triplet.dstId, len))
  else Iterator.empty
}

def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
  if (msg1 < msg2) msg1 else msg2
}

Pregel(graph, new Integer(-1), 3, EdgeDirection.Either)(vProg, sendMsg,
mergeMsg)
  }

  def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Pregel Test")
conf.set("spark.master", "local")
val sc = new SparkContext(conf)
val test = new HashMap[String, Int]

// create a simplest test graph with 3 nodes and 2 edges 
val vertexList = Array(
  (0.asInstanceOf[VertexId], new HashMap[String, Int]),
  (1.asInstanceOf[VertexId], new HashMap[String, Int]),
  (2.asInstanceOf[VertexId], new HashMap[String, Int]))
val edgeList = Array(
  Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
HashMap[String, Int]),
  Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
HashMap[String, Int]))

val vertexRdd = sc.parallelize(vertexList)
val edgeRdd = sc.parallelize(edgeList)
val g = Graph[HashMap[String, Int], HashMap[String, Int]](vertexRdd,
edgeRdd)

// run test code 
val lpa = run(g)
lpa.vertices.collect().map(println)
  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-23 Thread rohit13k
Created a JIRA for the same

https://issues.apache.org/jira/browse/SPARK-18568



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-23 Thread rohit13k
Hi 

I am facing a similar issue. It's not that the message is getting lost or
something. The vertex 1 attributes changes in super step 1 but when the
sendMsg gets the vertex attribute from the edge triplet in the 2nd superstep
it stills has the old value of vertex 1 and not the latest value. So as per
your code no new msg will be generated in the superstep. I think the bug is
in the replicatedVertexView where the srcAttr and dstAttr of the
edgeTripplet is updated from the latest version of the vertex after each
superstep.

How to get this bug raised? I am struggling to find an exact solution for it
except for recreating the graph after every superstep to reinforce edge
triplets to have the latest value of the vertex. but this is not a good
solution performance wise.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



GraphFrame graph partitioning

2016-05-25 Thread rohit13k
How to do graph partition in GraphFrames similar to the partitionBy feature
in GraphX? Can we use the Dataframe's repartition feature in 1.6 to provide
a graph partitioning in graphFrames?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphFrame-graph-partitioning-tp27024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



EdgeTriplet showing two versions of the same vertex

2015-10-14 Thread rohit13k
Hi

I have a scenario where in the graph I am doing graph.vertices.collect() and
getting the 5 vertex i added each of my vertex is an scala object as shown
below

class NodeExact(nodeId: Long, summ: Array[collection.mutable.Map[Long,
Long]]) extends Serializable {
  var node: Long = nodeId
  var currentsuperstep = 0
  var summary: Array[collection.mutable.Map[Long, Long]] = summ
  var ischanged = false
  def getsummary(window: Long): Int = {
var i = 0
var sum = summary.clone()
sum=sum.filter({ p => p != null })
sum.foreach(f => f.filter { case (value, time) => time > window })
var temp: scala.collection.Set[Long] = null
for (i <- 0 to sum.length - 1) {
  if (temp == null)
temp = sum(i).keySet
  else
temp ++ sum(i).keySet
}
return temp.size
  }
}

there are multiple edges between nodes in the graph i.e both a -> b and b->a 
now when i do 
graph.triplets.collect()

I am getting edgetriplets with source id as *a* but the src attr of* a* is
not same as the value of *a* in the vertexRDD for some edge triplets where
as for some edge triplets its same as vertexRDD. 

I am not able to understand how come src Attr for the same vertex for two
edgetripplets can have different values? It should always have the same attr
as in vertexRDD?

Please let me know if I am missing something.

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EdgeTriplet-showing-two-versions-of-the-same-vertex-tp25058.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: graphx - mutable?

2015-10-14 Thread rohit13k
Hi

I am also working on the same area where the graph evolves over time and the
current approach of rebuilding the graph again and again is very slow and
memory consuming did you find any workaround?
What was your usecase?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/graphx-mutable-tp15777p25057.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org