Hi All, I have been facing a problem in Giraph for last two weeks. Following is the detailed description, sample code, corresponding log message and a few questions. I will greatly appreciate any help in this regard.
Code Description: ================= My algorithm assigns a score to select vertexes of the graph. I have to find the best K(=5) vertexes such that their score is minimum. I have written a Persistent Aggregator for this which takes Text value. The snippet of the code is given below. It works fine on my laptop (single worker), but not on the cluster of 4 computers, when running with 10 workers. I am using Hadoop 1.0.0 and Giraph 1.0.0. Problem Description: ==================== In the aggregator one of the previously aggregated value suddenly gets lost. Therefore, my algorithm generates wrong results. You can observe this in the logs, given below, in following manner a. At line number 11, the vertex 8.4793-1 has weight 3.0, and it is the best so far. b. At line number 12, the aggregator receives a new value "6.1100-1,6.0", i.e., vertex=6.1100-1 has weight=6.0 c. At line number 13, in the begining of aggregate method, the output of getAggregatedValue() is also same as newValue. This means it simply forgets the previously aggregated value, why so? I have printed object-ids also, in the log messages, in order to ensure that this is happening in the same object. Source Code Snippet: ==================== @Override public void aggregate(Text value) { printLogMessage("aggregate", "MyAGG", "Entering - NewValue=", value); printLogMessage("aggregate", "MyAGG", "Entering - getAggregatedValue()=", getAggregatedValue()); // Main Logic is here - Omitting for simplicity setAggregatedValue(new Text(myTopK)); printLogMessage("aggregate", "MyAGG", "Exiting AggregatedValue=", getAggregatedValue().toString()); } @Override public Text createInitialValue() { printLogMessage("=====>createInitialValue", "MyAGG", "Entering getAggregatedValue()=", getAggregatedValue()); Text initialValue = getAggregatedValue(); if (initialValue == null) { initialValue = new Text(); } printLogMessage("=====>createInitialValue", "MyAGG", "Exiting ReturnedValue=", getAggregatedValue()); return initialValue; } private void printLogMessage(String methodName, String prefix, String msg, Object obj) { if (isDebugOn(methodName)) { if (obj == null) { obj = ""; } System.out.println(java.lang.System.identityHashCode(this) + "--"+ prefix + "==>" + methodName + "() - " + msg + obj.toString()); } } Printed Log Messages: ===================== 1. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=2 2. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()= 3. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue= 4. 249768912--MyAGG==>aggregate() - Entering - NewValue=8.4793-1,3.0 5. 249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=8.4793-1,3.0 6. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=8.4793-1,3.0 7. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0 8. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0 9. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=3 10. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0 11. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0 12. 249768912--MyAGG==>aggregate() - Entering - NewValue=6.1100-1,6.0 13. 249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=6.1100-1,6.0 14. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=6.1100-1,6.0 15. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=6.1100-1,6.0 16. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=6.1100-1,6.0 17. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=4 18. ... ... My Queries: =========== 1. Why does the createInitialValue method gets called twice, in the same superstep, on the same object of the aggregator? 2. Even if its gets called twice, why should it forget the previouly aggregated value? Please help me resolve this issue. This is a major problem in my work. I am not able to proceed further because of this. Thanks in anticipation. - Puneet IIT Delhi, India