I don’t think there’s any reason to assume that each consumer will process an equal number of messages.
-JZ On Nov 17, 2013, at 2:51 PM, Sznajder ForMailingList <[email protected]> wrote: > Hi Jordan.. > > Regarding the output: > > As you can see the LOG prints the name of the consumer and "processed " and > the item.... > > I am running the program with 4 servers in my chorum: > ir-hadoop1 server is a producer > ir-hadoop2--> ir-hadoop4 are consumers. > > After 14 minutes, I simply count the number of procssed items on each one of > the consumer (a simplistic grep on the LOG file) and I get the folliwng: > > ir-hadoop2 : 3042 processed items > ir-hadoop3 : 1276 processed items > ir-hadoop4 : 830 processed items... > > If I have a look at the procssed times, I can see that ir-hadoop4 , is most > of the time idle... I attach here the LOG corresponding to ir-hadoop4 for > example > > Benjamin > > > On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman > <[email protected]> wrote: > The example out is missing. Please provide that too. > > -Jordan > > On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList > <[email protected]> wrote: > >> First at all , thank you for your answer. >> >> Here is the simple code, I used: >> >> The producer and queueconsummer are given in the class >> >> Every 5 minutes, I am printing the the number of processed items, and I see >> some drastic differences between the different consumers: >> >> >> >> Producer: >> =-=-=-=-= >> >> package com.zk; >> >> import java.io.Closeable; >> import java.io.IOException; >> import java.text.DateFormat; >> import java.text.SimpleDateFormat; >> import java.util.Date; >> import java.util.List; >> >> import org.apache.curator.framework.CuratorFramework; >> import org.apache.curator.framework.api.CuratorEvent; >> import org.apache.curator.framework.api.CuratorListener; >> import org.apache.curator.framework.recipes.queue.DistributedQueue; >> import org.apache.curator.test.TestingServer; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> >> public class QueueProducer implements Closeable { >> >> final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class); >> >> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); >> >> protected static final String PATH = "/test_queue"; >> >> protected static final String LOCK_PATH = "/test_lock_queue"; >> >> private DistributedQueue<CrawlUrl> queue; >> >> private static final int QUEUE_SIZE = 100000; >> >> private int items; >> >> public QueueProducer(CuratorFramework framework) throws Exception { >> LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a >> QueueProducer"); >> System.out.println(java.net.InetAddress.getLocalHost().getHostName() >> + " is a QueueProducer"); >> this.queue = Utils.newDistributedQueue(framework, >> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null); >> this.queue.start(); >> addQueueContent(QUEUE_SIZE); >> System.out.println("Done with the initial init"); >> >> >> // We register to the listener for monitoring the number of elements >> // in the queue >> framework.getCuratorListenable().addListener(new CuratorListener() { >> @Override >> public void eventReceived(final CuratorFramework framework_, >> CuratorEvent event) throws Exception { >> if (event.getPath() != null && >> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) { >> // this also restores the notification >> List<String> children = framework_.getChildren() >> .watched().forPath(Utils.CRAWL_QUEUE_PATH); >> if (children.size() <= QUEUE_SIZE/2) { >> addQueueContent(QUEUE_SIZE - children.size()); >> } >> } >> } >> }); >> >> >> while (true) { >> List<String> children = >> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH); >> if (children.size() <= QUEUE_SIZE/2) { >> LOG.info(dateFormat.format(new Date()) + " - In the >> while(true) - We call for size " + children.size()); >> addQueueContent(QUEUE_SIZE - children.size()); >> } >> >> Thread.sleep(5000); >> >> } >> } >> >> void addQueueContent(int numberOfItems) { >> LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + >> numberOfItems); >> for (int i = 0; i < numberOfItems; i++) { >> try { >> CrawlUrl url = new CrawlUrl(""+this.items++); >> this.queue.put(url); >> } catch (Exception e) { >> LOG.error ("Caught an error when adding the item " + i + " >> in the initQueueContent()"); >> } >> } >> } >> >> public static void main(String[] args) { >> CrawlerPropertyFile props; >> try { >> props = new CrawlerPropertyFile(args[0]); >> >> final String connectString; >> System.out.println("DEBUG = " + Utils.DEBUG); >> if (props.useZkTestServer()) { >> System.out.println("Will launch from zkTestServer"); >> TestingServer server = new TestingServer(); >> connectString = server.getConnectString(); >> } else { >> connectString = props.getZkServer(); >> } >> >> final CuratorFramework framework = >> Utils.newFramework(connectString); >> framework.start(); >> >> @SuppressWarnings("unused") >> QueueProducer producer = new QueueProducer(framework); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> } >> >> @Override >> public void close() throws IOException { >> this.queue.close(); >> } >> >> >> >> } >> >> >> >> >> Consumer >> =-=-=-=-=- >> >> package com.zk; >> >> import java.io.Closeable; >> import java.io.File; >> import java.io.FileWriter; >> import java.io.IOException; >> import java.text.DateFormat; >> import java.text.SimpleDateFormat; >> import java.util.Date; >> >> import org.apache.curator.framework.CuratorFramework; >> import org.apache.curator.framework.recipes.queue.DistributedQueue; >> import org.apache.curator.framework.recipes.queue.QueueConsumer; >> import org.apache.curator.framework.state.ConnectionState; >> import org.apache.curator.test.TestingServer; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> >> >> public class MyQueueConsumer implements Closeable{ >> >> private DistributedQueue<CrawlUrl> queue; >> >> String name; >> >> String id; >> >> FileWriter timeCounter; >> >> final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class); >> >> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); >> >> int numberOfProcessedURL; >> >> private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread; >> >> private class FileWriterThread extends Thread { >> >> public FileWriterThread() { >> // empty ctor >> } >> >> @Override >> public void run() { >> // We write the stats: >> >> try { >> while (true) { >> >> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+ >> >> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ; >> MyQueueConsumer.this.timeCounter.flush(); >> >> // Sleeps 5 minutes >> Thread.sleep(300000); >> } >> } catch (Exception e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } >> } >> } >> >> >> public MyQueueConsumer(CuratorFramework framework, final String id) >> throws Exception { >> this.id = id; >> this.name = java.net.InetAddress.getLocalHost().getHostName(); >> this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ >> this.name + "_" +id + "_timeCounter.txt")); >> >> // this.timeCounterThread = new FileWriterThread(); >> // this.timeCounterThread.start(); >> this.queue = Utils.newDistributedQueue(framework, >> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new >> QueueConsumer<CrawlUrl>() { >> >> @Override >> public void stateChanged(CuratorFramework client, >> ConnectionState newState) { >> System.out.println(String.format("[%s] connection state >> changed to %s", id, newState)); >> } >> >> @Override >> public void consumeMessage(CrawlUrl url) throws Exception { >> try { >> LOG.info(dateFormat.format(new >> Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ >> "] processed " + url.url); >> MyQueueConsumer.this.numberOfProcessedURL++; >> } catch (Exception e) { >> LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" >> + e.getMessage() + " for url " + url.url ); >> } >> } >> >> }); >> try { >> this.queue.start(); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> } >> >> public static void main(String[] args) { >> try { >> CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]); >> >> final String connectString; >> System.out.println("DEBUG = " + Utils.DEBUG); >> if (props.useZkTestServer()) { >> System.out.println("Will launch from zkTestServer"); >> TestingServer server = new TestingServer(); >> connectString = server.getConnectString(); >> } else { >> connectString = props.getZkServer(); >> } >> >> final CuratorFramework framework = >> Utils.newFramework(connectString); >> framework.start(); >> >> final MyQueueConsumer[] queueConsumers = new >> MyQueueConsumer[props.getNumberOfWorkers()]; >> >> for (int i = 0; i < queueConsumers.length; i++) { >> queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i); >> } >> >> Runtime.getRuntime().addShutdownHook(new Thread() { >> @Override >> public void run() { >> // close workers >> Throwable t = null; >> LOG.info("We close the workers"); >> for (MyQueueConsumer queueConsumer : queueConsumers) { >> try { >> queueConsumer.close(); >> } catch (Throwable th) { >> if (t == null) { >> t = th; >> } >> } >> } >> // throw first exception that we encountered >> if (t != null) { >> throw new RuntimeException("some workers failed to >> close", t); >> } >> } >> }); >> >> }catch (Exception e ){ >> e.printStackTrace(); >> } >> } >> >> @Override >> public void close() throws IOException { >> this.queue.close(); >> } >> } >> >> >> >> >> Main >> -=-=- >> >> package com.zk; >> >> import org.apache.curator.framework.CuratorFramework; >> import org.apache.curator.test.TestingServer; >> >> >> >> public class QueueTestMain { >> >> /** >> * @param args >> */ >> public static void main(String[] args) { >> CrawlerPropertyFile props; >> try { >> props = new CrawlerPropertyFile(args[0]); >> >> final String connectString; >> System.out.println("DEBUG = " + Utils.DEBUG); >> if (props.useZkTestServer()) { >> System.out.println("Will launch from zkTestServer"); >> TestingServer server = new TestingServer(); >> connectString = server.getConnectString(); >> } else { >> connectString = props.getZkServer(); >> } >> >> final CuratorFramework framework = >> Utils.newFramework(connectString); >> framework.start(); >> >> >> if (args[1] != null && args[1].equalsIgnoreCase("true")) { >> @SuppressWarnings("unused") >> QueueProducer producer = new QueueProducer(framework); >> } else { >> >> final MyQueueConsumer[] queueConsumers = new >> MyQueueConsumer[props.getNumberOfWorkers()]; >> >> for (int i = 0; i < queueConsumers.length; i++) { >> queueConsumers[i] = new MyQueueConsumer(framework, >> "id_"+i); >> } >> >> Runtime.getRuntime().addShutdownHook(new Thread() { >> @Override >> public void run() { >> // close workers >> Throwable t = null; >> for (MyQueueConsumer queueConsumer : queueConsumers) >> { >> try { >> queueConsumer.close(); >> } catch (Throwable th) { >> if (t == null) { >> t = th; >> } >> } >> } >> // throw first exception that we encountered >> if (t != null) { >> throw new RuntimeException("some workers failed >> to close", t); >> } >> } >> }); >> >> >> } >> }catch (Exception e ){ >> e.printStackTrace(); >> } >> >> } >> >> } >> >> >> >> >> Example of output: >> >> >> >> >> >> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman >> <[email protected]> wrote: >> Can you produce a test that shows this? Anything else interesting in the >> log? Of course, there could be a bug. >> >> -Jordan >> >> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList >> <[email protected]> wrote: >> >> > Hi >> > >> > I made a short test as following: >> > >> > - I have a chorum of 3 nodes for Zookeeper. >> > - I wrote a class using Curator QueueProducer who produces all the time >> > (when the queue is 10% full, it creates new items) , items (random integer) >> > - I wrote a simple class using Curator Queue Consumer which simply prints >> > to Log "consumed item i". >> > >> > I tested some different combinations : >> > - running the consumers on one, two or three nodes. >> > - running one or more consumers in parallel on a given node. >> > >> > >> > But, and here is my question: I see some very strange behavior when I have >> > several consummers in parallel on a node. For example, running 5 consumers >> > per node on 3 nodes, I see a throughput **very** slow. When looking at my >> > Log, I see that most of the consumers are most of the time on an idle >> > state.... >> > >> > Do I mistake somewhere? >> > I was expecting to enhance the throughput by augmenting the number of >> > consumers, I am surprised to see the opposite.... >> > >> > Thanks a lot >> > >> > Benjamin >> >> > > > <ir-hadoop4_log.txt>
