Github user spupyrev commented on a diff in the pull request: https://github.com/apache/giraph/pull/82#discussion_r239886833 --- Diff: giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java --- @@ -0,0 +1,128 @@ +package org.apache.giraph.examples; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.log4j.Logger; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; + +public class DiffusionMigrationSimulationComputation extends MigrationFullBasicComputation<LongWritable, DiffusionVertexValue, NullWritable, IntWritable> { + + Logger LOG = Logger.getLogger(this.getClass()); + + + + /*public void initialize(GraphState graphState, + WorkerClientRequestProcessor<LongWritable, DiffusionVertexValue, NullWritable> workerClientRequestProcessor, + CentralizedServiceWorker<LongWritable, DiffusionVertexValue, NullWritable> serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { + super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); + delta = getConf().getDouble(DiffusionMigrationMasterCompute.diffusionDeltaOption, DiffusionMigrationMasterCompute.diffusionDeltaOptionDefault); + modelSwitch = getConf().getBoolean(DiffusionMigrationMasterCompute.diffusionListenOption, false); + + }*/ + + @Override + public void compute(Vertex<LongWritable, DiffusionVertexValue, NullWritable> vertex, Iterable<IntWritable> msgs) + throws IOException { + + DiffusionVertexValue value = vertex.getValue(); + if(getSuperstep()==0) { //First superstep just to know the first label to analyze + setup(value); + }else { + boolean byLabel = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.byLabelOption)).get(); + int currentLabel = (int)((LongWritable)getAggregatedValue(DiffusionMigrationMasterCompute.currentLabel)).get(); + // aggregators must be analyzed after first superstep + boolean timeToSwitch = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.timeToSwitch)).get(); + if (timeToSwitch)//time to switch label? + if(value.getLabel()<currentLabel) + aggregate(DiffusionMigrationMasterCompute.nextLabel, new LongWritable(value.getLabel())); + if(value.isVertexInvited(currentLabel)) { + if (byLabel) { + if(!value.isVertexDead() && getSuperstep()!=1) { //Update the using probability, if not dead + int activeNeighbors = checkMsgsAndUpdateProbability(msgs, value); + if(activeNeighbors==value.getVertexThreshold()) + aggregate(DiffusionMigrationMasterCompute.hesitantVerticesAggregator,new LongWritable(1)); + } + aggregateVerticesBasedOnProbability(value); + if(value.rollActivationDice()) { //if this vertex is using the feature + aggregate(DiffusionMigrationMasterCompute.usingVerticesAggregator, new LongWritable(1)); + sendMessageToAllEdges(vertex, new IntWritable(1)); + } + }else {//computation by min number + if(!timeToSwitch) { + boolean justChanged = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.justChangedTimeToSwitch)).get(); + //Update the using probability if not dead and the computation has not just became active + //(because we don't have old messages sent so it would wrongly decrease the probability) + if(!value.isVertexDead() && !justChanged) { + int activeNeighbors = checkMsgsAndUpdateProbability(msgs, value); + if(activeNeighbors==value.getVertexThreshold()) + aggregate(DiffusionMigrationMasterCompute.hesitantVerticesAggregator,new LongWritable(1)); + } + aggregateVerticesBasedOnProbability(value); + if(value.rollActivationDice()) { //if this vertex is using the feature + aggregate(DiffusionMigrationMasterCompute.usingVerticesAggregator, new LongWritable(1)); + sendMessageToAllEdges(vertex, new IntWritable(1)); + } + }else{ //if computation is not active and we're using a min number switch model + aggregate(DiffusionMigrationMasterCompute.potentialVerticesAggregator, new LongWritable(1)); + } + } + } + //vertex.voteToHalt(); + } + + + } + + private void setup(DiffusionVertexValue value) { + double delta = Double.parseDouble(getConf().getStrings("Delta", "0.005")[0]); + value.setDelta(delta); + double initialActivationProbability = Double.parseDouble(getConf().getStrings("InitialProbability","0.02")[0]); --- End diff -- Please use org.apache.giraph.conf.FloatConfOption instead
---