Hello,

I'm attaching PiEstimator with some changes. I'm sending exactly two
messages to "master" from each node.

      bspPeer.send(masterTask, estimate);
      LOG.info("Send message:" + System.currentTimeMillis());
      bspPeer.send(masterTask, estimate2);
      LOG.info("Send message:" + System.currentTimeMillis());

After that I'm trying to receive all messages:
     LOG.info("Num msg = " + bspPeer.getNumCurrentMessages());
      while ((received = bspPeer.getCurrentMessage()) != null) {
        LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
       ....
      }

In the log file I see:
2011-02-14 09:46:07,124 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 11/02/14 09:46:07 INFO
algorithms.PiEstimator$MyEstimator: Send message:1297673167123
2011-02-14 09:46:07,125 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 11/02/14 09:46:07 INFO
algorithms.PiEstimator$MyEstimator: Send message:1297673167125
2011-02-14 09:46:07,270 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 11/02/14 09:46:07 INFO
algorithms.PiEstimator$MyEstimator: Receives messages:3.1536
2011-02-14 09:46:07,270 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 11/02/14 09:46:07 INFO
algorithms.PiEstimator$MyEstimator: Receives messages:3.148
2011-02-14 09:46:07,271 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 11/02/14 09:46:07 INFO
algorithms.PiEstimator$MyEstimator: Receives messages:3.1392
2011-02-14 09:46:07,272 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 11/02/14 09:46:07 INFO
algorithms.PiEstimator$MyEstimator:
2011-02-14 09:46:07,272 INFO org.apache.hama.bsp.TaskRunner:
attempt_201102140938_0003_m_000000_0 Estimated value of PI is
3.1450000000000005


On each node I see that two messages are sent (so master should receive 6
messages - I have 3 nodes). In the log file I see only 3 records
"Receives...", so only 3 messages were processed by master task. I don'
understand why is that.

What's more, I have tried to change line:
while ((received = bspPeer.getCurrentMessage()) != null) {

to
if ((received = bspPeer.getCurrentMessage()) != null) {

So there should be messages in the queue which are not processed. In my
opinion bsp() should be called again and again... and in consequence falls
in loop of calls. But it doesn't happen - bsp() method is run exactly once
and framework is probably hanging after last line of the bsp() code. Could
you explain this for me?

Best regards,
Pawel



2011/2/10 Edward J. Yoon <[email protected]>

> It should be possible to send multiple messages to other nodes.
>
> Could you please show me your source code and debug level log?
>
>
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.BSPPeerProtocol;
import org.apache.hama.util.Bytes;
import org.apache.zookeeper.KeeperException;

public class PiEstimator {
  private static String MASTER_TASK = "master.task.";

  public static class MyEstimator extends BSP {
    public static final Log LOG = LogFactory.getLog(MyEstimator.class);
    private Configuration conf;
    private String masterTask;
    private static final int iterations = 10000;

    public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException,
        InterruptedException {
      int in = 0, out = 0;
      for (int i = 0; i < iterations; i++) {
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
        if ((Math.sqrt(x * x + y * y) < 1.0)) {
          in++;
        } else {
          out++;
        }
      }

      byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
      byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
      BSPMessage estimate = new BSPMessage(tagName, myData);
      BSPMessage estimate2 = new BSPMessage(tagName, myData);

      bspPeer.send(masterTask, estimate);
      LOG.info("Send message:" + System.currentTimeMillis());
      bspPeer.send(masterTask, estimate2);
      LOG.info("Send message:" + System.currentTimeMillis());
      bspPeer.sync();

      double pi = 0.0;
      BSPMessage received;
      LOG.info("Num msg = " + bspPeer.getNumCurrentMessages());
      while ((received = bspPeer.getCurrentMessage()) != null) {
        LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
        if(pi == 0.0) {
          pi = Bytes.toDouble(received.getData());
        } else {
          pi = (pi + Bytes.toDouble(received.getData())) / 2;
        }
      }

      if (pi != 0.0) {
        LOG.info("\nEstimated value of PI is " + pi);
      }

    }

    public Configuration getConf() {
      return conf;
    }

    public void setConf(Configuration conf) {
      this.conf = conf;
      this.masterTask = conf.get(MASTER_TASK);
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();

    BSPJob bsp = new BSPJob(conf, PiEstimator.class);
    // Set the job name
    bsp.setJobName("pi estimation example");
    bsp.setBspClass(MyEstimator.class);
    
    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(true);
    
    if(args.length > 0) {
      bsp.setNumBspTask(Integer.parseInt(args[0]));
    } else {
      // Set to maximum
      bsp.setNumBspTask(cluster.getGroomServers());
    }
    
    // Choose one as a master
    for (String peerName : cluster.getActiveGroomNames().values()) {
      conf.set(MASTER_TASK, peerName);
      break;
    }

    long startTime = System.currentTimeMillis();
    BSPJobClient.runJob(bsp);
    System.out.println("Job Finished in "+
        (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
  }
}

Reply via email to