Hi Chris, I found out it was a silly mistake but as soon as I saw the logs, it was easy to take it down. Now my consumer gets this and does not do any progress. I guess the database records are not getting into the partition, could this be because I am creating my partition every time the register method is called? Thanks again for the help! It is very much appreciated.
Renato M. 2014-11-05 22:28:07 MetricsRegistryMap [DEBUG] Creating new counter org.apache.samza.system.SystemConsumersMetrics order-messages-per-poll. 2014-11-05 22:28:07 MetricsRegistryMap [DEBUG] Add new counter org.apache.samza.system.SystemConsumersMetrics order-messages-per-poll 0. 2014-11-05 22:28:08 OrderFeed [ERROR] Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. 2014-11-05 22:28:08 MetricsRegistryMap [DEBUG] Adding new gauge org.apache.samza.system.chooser.RoundRobinChooserMetrics buffered-messages 0. 2014-11-05 22:28:08 SamzaContainer [INFO] Entering run loop. 2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null. 2014-11-05 22:28:08 SystemConsumers [DEBUG] Refreshing chooser with new messages. 2014-11-05 22:28:08 SystemConsumers [DEBUG] Polling system consumer: order 2014-11-05 22:28:08 SystemConsumers [DEBUG] Getting fetch map for system: order 2014-11-05 22:28:08 SystemConsumers [DEBUG] Fetching: Map(SystemStreamPartition [partition=Partition [partition=0], system=order, stream=order] -> 10000) 2014-11-05 22:28:08 SystemConsumers [DEBUG] Got incoming message envelopes: [] 2014-11-05 22:28:08 TaskStorageManager [DEBUG] Flushing stores. 2014-11-05 22:28:08 SystemProducers [DEBUG] Flushing source: Partition-0 2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Flushing buffer with size: 0. 2014-11-05 22:28:08 KafkaSystemProducer [INFO] Creating a new producer for system kafka. 2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Created a new producer for system kafka. 2014-11-05 22:28:08 DefaultEventHandler [DEBUG] Handling 0 events 2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Flushed buffer. 2014-11-05 22:28:08 OffsetManager [DEBUG] Skipping checkpointing for partition Partition [partition=0] because no checkpoint manager is defined. 2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null. 2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null. 2014-11-05 18:17 GMT+01:00 Chris Riccomini <[email protected]> : > Hey Renato, > > Here's your problem: > > 2014-11-05 15:10:14 SamzaContainer$ [INFO] Failed to create a consumer for > order, so skipping. > 2014-11-05 15:10:14 SamzaContainer$ [INFO] Got system consumers: Set() > > Samza is unable to instantiate your SystemConsumer. Turn the log-level to > debug, so you can see the full stack trace. > > Cheers, > Chris > > From: Renato Marroquín Mogrovejo <[email protected]<mailto: > [email protected]>> > Reply-To: "[email protected]<mailto: > [email protected]>" <[email protected]<mailto: > [email protected]>> > Date: Wednesday, November 5, 2014 9:14 AM > To: "[email protected]<mailto:[email protected]>" > <[email protected]<mailto:[email protected]>> > Subject: Re: How to create partitions from input > > Hi Chris, > > I am attaching them. Please let me know if they go through if not I can > try putting them somewhere else. > Many thanks again! > > > Renato M. > > 2014-11-05 17:42 GMT+01:00 Chris Riccomini <[email protected] > <mailto:[email protected]>>: > Hey Renato, > > It looks like your config is good. I'm wondering if your consumer is > failing to get created. I'm looking for this line in the logs: > > "Failed to create a consumer for order, so skipping" > > > If the consumer creation fails, Samza will continue on, and assume that > you won't use the consumer, but if you do, it could lead to an exception > like you have. > > Can you attach the logs to the job? > > > Cheers, > Chris > > On 11/5/14 8:15 AM, "Renato Marroquín Mogrovejo" > <[email protected]<mailto:[email protected]>> wrote: > > >Hi Chris, > > > >This is what my config file looks like: > > > ># Job > >job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >job.name<http://job.name>=order-feed > ># YARN > >yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.ver > >sion}-dist.tar.gz > ># Task > >task.class=samza.examples.order.task.OrderFeedStreamTask > >task.inputs=order.order > ># Serializers > >serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFact > >ory > ># Wikipedia System > >systems.order.samza.factory=samza.examples.order.system.OrderSystemFactory > > > >The complete file is in here > > > https://github.com/renato2099/hello-samza/blob/master/samza-job-package/sr > >c/main/config/order-feed.properties > >Thanks in advance for the help. > > > > > >Renato M. > > > >2014-11-05 17:06 GMT+01:00 Chris Riccomini > ><[email protected]<mailto:[email protected]> > >>: > >> > >> Hey Renato, > >> > >> Could you send the config for your job? > >> > >> The stack trace is indicating that you are using a system "task.inputs" > >> that isn't defined in the configuration. For example, if you have: > >> > >> task.inputs=kafka.my-topic > >> > >> But, you haven't defined a "kafka" system (systems.kafka.*) then you > >>will > >> see this exception. > >> > >> Cheers, > >> Chris > >> > >> On 11/5/14 6:14 AM, "Renato Marroquín Mogrovejo" > >> <[email protected]<mailto:[email protected]>> > wrote: > >> > >> >Hi Yang, > >> > > >> >I tried setting the task.input to have the same key as the system's > >>name, > >> >but I keep on getting error while trying to run it: > >> > > >> >2014-11-05 09:42:07 OffsetManager [INFO] Successfully loaded starting > >> >offsets: Map(SystemStreamPartition [partition=Partition [partition=0], > >> >system=order, stream=order] -> null) > >> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting task instance > >>stores. > >> >2014-11-05 09:42:07 SamzaContainer [INFO] Initializing stream tasks. > >> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances > >> >with producers. > >> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting producer > >>multiplexer. > >> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances > >> >with consumers. > >> >2014-11-05 09:42:07 SamzaContainer [ERROR] Caught exception in process > >> >loop. > >> >org.apache.samza.system.SystemConsumersException: can't register > >> >order's consumer. > >> > at > >> > >>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:16 > >>4 > >> >) > >> > at > >> > >>>org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.appl > >>>y > >> >(TaskInstance.scala:128) > >> > at > >> > >>>org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.appl > >>>y > >> >(TaskInstance.scala:124) > >> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > >> > at > >> > >>org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.sc > >>a > >> >la:124) > >> > at > >> > >>>org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.apply > >>>( > >> >SamzaContainer.scala:577) > >> > at > >> > >>>org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.apply > >>>( > >> >SamzaContainer.scala:577) > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> > at > >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> > at > >> > >>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206 > >>>) > >> > at > >> > >>org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.s > >>c > >> >ala:577) > >> > at > >> >org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:501) > >> > at > >> > >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81) > >> > at > >org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >Caused by: java.util.NoSuchElementException: key not found: order > >> > at scala.collection.MapLike$class.default(MapLike.scala:228) > >> > at scala.collection.AbstractMap.default(Map.scala:58) > >> > at scala.collection.MapLike$class.apply(MapLike.scala:141) > >> > at scala.collection.AbstractMap.apply(Map.scala:58) > >> > at > >> > >>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:16 > >>5 > >> >) > >> > ... 13 more > >> > > >> > > >> > > >> >Maybe it has to do with me creating the partitions on the start method > >>? > >I > >> >guess the register method gets called first, then the start and finally > >> >the > >> >stop? > >> >Any suggestions? Thank you very much for your help! > >> > > >> > > >> >Renato M. > >> > > >> >2014-11-03 18:57 GMT+01:00 Yan Fang <[email protected]<mailto: > [email protected]>>: > >> > > >> >> Hi Renato, > >> >> > >> >> 1) In the task.inputs option you told me that I should use something > >> >>like > >> >> "ordersystem.order", I guess I have to use ordersystem because of the > >> >> factory name "OrderSystemFactory"? I tried using "order.order" but I > >> >>got an > >> >> <o.a.s.s.SystemConsumersException: can't register order's consumer> > >>So > >I > >> >> imagine there is a naming convention for classes, inputs, and > >factories? > >> >> > >> >> 1) There is no restriction in the system name. I just used that as > >>an > >> >> example. As long as the system name in task.inputs is the same as the > >> >> system name in systems.%systemname.samza.factory. For example, in the > >> >> following properties, the two bold names should be the same. > >> >> task.inputs=*foosystem*.tableName, systems.*foosystem* > >> >> .samza.factory=samza.examples.wikipedia.system.OrderSystemFactory > >> >> > >> >> In your case, the task.inputs=*order*.order , because systems.*order* > >> >> .samza.factory=samza.examples.order.system.OrderSystemFactory > >> >> > >> >> 2) I tried using the "ordersystem.order" name but I kept on getting > >>the > >> >> NoPartitions exception > >> >> > >> >> 2) The code in your previous register method should be ok. > >> >> > >> >> 3) The BlockingEnvelopeMap has a put method to put the incoming > >messages > >> >> into the partition, and I am putting the table values on the start > >> >>method, > >> >> is this a bad practice? > >> >> > >> >> 3) It depends. Since the start method is only called once, the > >>system > >> >> will only read the table once and put all the records into the > >> >> BlockingEnvelop. For the testing purpose, I think it is fine. You can > >> >>have > >> >> a look at our filesystem > >> >> < > >> >> > >> >> > >https://github.com/apache/incubator-samza/blob/master/samza-core/src/main > >> > >>>>/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scal > >>>>a > >> >> >, > >> >> which uses a thread to monitor the file. > >> >> > >> >> 4) WikipediaFeedStreamTask takes an envelope object and puts that to > >> >>Kafka, > >> >> but where does it get the envelope from? the consumer right? > >> >> > >> >> 4) Yes, all the messages are from the put method in the consumer. > >> >> > >> >> Thanks, > >> >> > >> >> Fang, Yan > >> >> [email protected]<mailto:[email protected]> > >> >> +1 (206) 849-4108<tel:%2B1%20%28206%29%20849-4108> > >> >> > >> >> On Mon, Nov 3, 2014 at 1:38 AM, Renato Marroquín Mogrovejo < > >> >> [email protected]<mailto:[email protected]>> > wrote: > >> >> > >> >> > Thanks for the help Yang! > >> >> > I think I understand more things now, but I also have more > >>questions > >> >>:) > >> >> > > >> >> > 1) In the task.inputs option you told me that I should use > >>something > >> >>like > >> >> > "ordersystem.order", I guess I have to use ordersystem because of > >>the > >> >> > factory name "OrderSystemFactory"? I tried using "order.order" but > >>I > >> >>got > >> >> an > >> >> > <o.a.s.s.SystemConsumersException: can't register order's consumer> > >> >>So I > >> >> > imagine there is a naming convention for classes, inputs, and > >> >>factories? > >> >> > 2) I tried using the "ordersystem.order" name but I kept on getting > >> >>the > >> >> > NoPartitions exception > >> >> > 3) The BlockingEnvelopeMap has a put method to put the incoming > >> >>messages > >> >> > into the partition, and I am putting the table values on the start > >> >> method, > >> >> > is this a bad practice? > >> >> > < > >> >> > > >> >> > > >> >> > >> >> > >https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src > >> >>/main/java/samza/examples/order/system/OrderConsumer.java > >> >> > > > >> >> > 4) WikipediaFeedStreamTask takes an envelope object and puts that > >>to > >> >> Kafka, > >> >> > but where does it get the envelope from? the consumer right? > >> >> > > >> >> > Thanks again Yang! > >> >> > > >> >> > > >> >> > Renato M. > >> >> > > >> >> > 2014-11-03 3:18 GMT+01:00 Yan Fang <[email protected]<mailto: > [email protected]>>: > >> >> > > >> >> > > Hi Renato, > >> >> > > > >> >> > > on the hello-samza example for each new incoming message which > >> >>belongs > >> >> > > to a channel, a new partition is created, right? > >> >> > > > >> >> > > -- In WikipediaFeed job of hello-samza, each channel actually is > >> >> treated > >> >> > as > >> >> > > one stream of the Wikipedia System. Each stream has on partition > >>0. > >> >> This > >> >> > is > >> >> > > the code: > >> >> > > SystemStreamPartition systemStreamPartition = new > >> >> > > SystemStreamPartition(systemName, > >> >> > > event.getChannel(), new Partition(0); > >> >> > > > >> >> > > So in your code, I think each table could be thought as one > >>stream, > >> >> with > >> >> > > only partition 0. So it will be like > >> >>task.input=ordersystem.tableName > >> >> > > > >> >> > > -- Then you maybe confused by what happened in Kafka. All the > >> >>messages > >> >> > sent > >> >> > > to wikipedia-raw topic is done in WikipediaFeedStreamTask > >> >> > > < > >> >> > > > >> >> > > >> >> > >> >> > >https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src > >> >>/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java > >> >> > > > > >> >> > > . > >> >> > > > >> >> > > Exception in thread "main" org.apache.samza.SamzaException: No > >> >> partitions > >> >> > > for this task. Can't run a task without partition assignments. > >>It's > >> >> > likely > >> >> > > that the partition manager for this system doesn't know about the > >> >> stream > >> >> > > you're trying to read. > >> >> > > at > >> >> > > >> > >>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77 > >>>>) > >> >> > > at > >> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >> > > > >> >> > > -- Is the stream registered to the system? > >> >> > > SystemStreamPartition systemStreamPartition = new > >> >> > > SystemStreamPartition(systemName, > >> >> > > "order", new Partition(0); > >> >> > > Is the "order" registered ? such as task.input=ordersystem.order > >>? > >> >> > > > >> >> > > Let me know if you have other questions. > >> >> > > > >> >> > > Thanks, > >> >> > > > >> >> > > Fang, Yan > >> >> > > [email protected]<mailto:[email protected]> > >> >> > > +1 (206) 849-4108<tel:%2B1%20%28206%29%20849-4108> > >> >> > > > >> >> > > On Sun, Nov 2, 2014 at 4:56 AM, Renato Marroquín Mogrovejo < > >> >> > > [email protected]<mailto:[email protected]>> > wrote: > >> >> > > > >> >> > > > Hello Samza team, > >> >> > > > > >> >> > > > > >> >> > > > I am trying to modify the hello-samza example application to > >> >>replay > >> >> > > events > >> >> > > > which are in a table. But I am having some troubles. > >> >> > > > So on the hello-samza example for each new incoming message > >>which > >> >> > belongs > >> >> > > > to a channel, a new partition is created, right? > >> >> > > > Now in my case, how (where) do I create these partitions? I > >create > >> >> them > >> >> > > in > >> >> > > > [1] but I am almost sure that is wrong because I keep getting > >>the > >> >> > > exception > >> >> > > > saying that there are no partitions for this task. I mean > >>ideally > >> >>I > >> >> > would > >> >> > > > like to create partitions based on the keys I am reading from > >>the > >> >> > table. > >> >> > > > Could anybody help me on this task please? Many thanks in > >advance! > >> >> > > > > >> >> > > > > >> >> > > > Renato M. > >> >> > > > > >> >> > > > > >> >> > > > Exception in thread "main" org.apache.samza.SamzaException: No > >> >> > partitions > >> >> > > > for this task. Can't run a task without partition assignments. > >> >>It's > >> >> > > likely > >> >> > > > that the partition manager for this system doesn't know about > >>the > >> >> > stream > >> >> > > > you're trying to read. > >> >> > > > at > >> >> > > > >> >> > >org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77) > >> >> > > > at > >> >> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >> > > > > >> >> > > > > >> >> > > > [1] > >> >> > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> > >https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src > >> >>/main/java/samza/examples/order/system/OrderConsumer.java#L47 > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > > >
