Awesome Chris! I managed to do what I was trying to. Thanks for the help guys!
Renato M. 2014-11-05 22:53 GMT+01:00 Chris Riccomini <criccom...@linkedin.com.invalid> : > Hey Renato, > > It seems to me that your consumer is not reading any messages. This seems > concerning: > > 2014-11-05 22:28:08 OrderFeed [ERROR] Communications link failure > > The last packet sent successfully to the server was 0 milliseconds > ago. The driver has not received any packets from the server. > > > Regarding creating a new partition, you shouldn't need to do this, but I > don't think it will cause any problems. Since your SystemFactory is using > a single partition SystemAdmin, Samza should only register partition 0 > anyway. > > Cheers, > Chirs > > On 11/5/14 1:48 PM, "Renato Marroquín Mogrovejo" > <renatoj.marroq...@gmail.com> wrote: > > >Hi Chris, > > > >I found out it was a silly mistake but as soon as I saw the logs, it was > >easy to take it down. > >Now my consumer gets this and does not do any progress. I guess the > >database records are not getting into the partition, could this be because > >I am creating my partition every time the register method is called? > >Thanks again for the help! It is very much appreciated. > > > > > >Renato M. > > > >2014-11-05 22:28:07 MetricsRegistryMap [DEBUG] Creating new counter > >org.apache.samza.system.SystemConsumersMetrics > >order-messages-per-poll. > >2014-11-05 22:28:07 MetricsRegistryMap [DEBUG] Add new counter > >org.apache.samza.system.SystemConsumersMetrics order-messages-per-poll > >0. > >2014-11-05 22:28:08 OrderFeed [ERROR] Communications link failure > > > >The last packet sent successfully to the server was 0 milliseconds > >ago. The driver has not received any packets from the server. > >2014-11-05 22:28:08 MetricsRegistryMap [DEBUG] Adding new gauge > >org.apache.samza.system.chooser.RoundRobinChooserMetrics > >buffered-messages 0. > >2014-11-05 22:28:08 SamzaContainer [INFO] Entering run loop. > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null. > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Refreshing chooser with > >new messages. > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Polling system consumer: order > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Getting fetch map for system: > >order > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Fetching: > >Map(SystemStreamPartition [partition=Partition [partition=0], > >system=order, stream=order] -> 10000) > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Got incoming message > >envelopes: [] > >2014-11-05 22:28:08 TaskStorageManager [DEBUG] Flushing stores. > >2014-11-05 22:28:08 SystemProducers [DEBUG] Flushing source: Partition-0 > >2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Flushing buffer with > >size: 0. > >2014-11-05 22:28:08 KafkaSystemProducer [INFO] Creating a new producer > >for system kafka. > >2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Created a new producer > >for system kafka. > >2014-11-05 22:28:08 DefaultEventHandler [DEBUG] Handling 0 events > >2014-11-05 22:28:08 KafkaSystemProducer [DEBUG] Flushed buffer. > >2014-11-05 22:28:08 OffsetManager [DEBUG] Skipping checkpointing for > >partition Partition [partition=0] because no checkpoint manager is > >defined. > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null. > >2014-11-05 22:28:08 SystemConsumers [DEBUG] Chooser returned null. > > > > > >2014-11-05 18:17 GMT+01:00 Chris Riccomini > ><criccom...@linkedin.com.invalid> > >: > > > >> Hey Renato, > >> > >> Here's your problem: > >> > >> 2014-11-05 15:10:14 SamzaContainer$ [INFO] Failed to create a consumer > >>for > >> order, so skipping. > >> 2014-11-05 15:10:14 SamzaContainer$ [INFO] Got system consumers: Set() > >> > >> Samza is unable to instantiate your SystemConsumer. Turn the log-level > >>to > >> debug, so you can see the full stack trace. > >> > >> Cheers, > >> Chris > >> > >> From: Renato Marroquín Mogrovejo <renatoj.marroq...@gmail.com<mailto: > >> renatoj.marroq...@gmail.com>> > >> Reply-To: "dev@samza.incubator.apache.org<mailto: > >> dev@samza.incubator.apache.org>" <dev@samza.incubator.apache.org > <mailto: > >> dev@samza.incubator.apache.org>> > >> Date: Wednesday, November 5, 2014 9:14 AM > >> To: > >>"dev@samza.incubator.apache.org<mailto:dev@samza.incubator.apache.org>" > >> <dev@samza.incubator.apache.org<mailto:dev@samza.incubator.apache.org>> > >> Subject: Re: How to create partitions from input > >> > >> Hi Chris, > >> > >> I am attaching them. Please let me know if they go through if not I can > >> try putting them somewhere else. > >> Many thanks again! > >> > >> > >> Renato M. > >> > >> 2014-11-05 17:42 GMT+01:00 Chris Riccomini > >><criccom...@linkedin.com.invalid > >> <mailto:criccom...@linkedin.com.invalid>>: > >> Hey Renato, > >> > >> It looks like your config is good. I'm wondering if your consumer is > >> failing to get created. I'm looking for this line in the logs: > >> > >> "Failed to create a consumer for order, so skipping" > >> > >> > >> If the consumer creation fails, Samza will continue on, and assume that > >> you won't use the consumer, but if you do, it could lead to an exception > >> like you have. > >> > >> Can you attach the logs to the job? > >> > >> > >> Cheers, > >> Chris > >> > >> On 11/5/14 8:15 AM, "Renato Marroquín Mogrovejo" > >> <renatoj.marroq...@gmail.com<mailto:renatoj.marroq...@gmail.com>> > wrote: > >> > >> >Hi Chris, > >> > > >> >This is what my config file looks like: > >> > > >> ># Job > >> >job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> >job.name<http://job.name>=order-feed > >> ># YARN > >> > >>>yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.v > >>>er > >> >sion}-dist.tar.gz > >> ># Task > >> >task.class=samza.examples.order.task.OrderFeedStreamTask > >> >task.inputs=order.order > >> ># Serializers > >> > >>>serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFa > >>>ct > >> >ory > >> ># Wikipedia System > >> > >>>systems.order.samza.factory=samza.examples.order.system.OrderSystemFacto > >>>ry > >> > > >> >The complete file is in here > >> > > >> > >> > https://github.com/renato2099/hello-samza/blob/master/samza-job-package/s > >>r > >> >c/main/config/order-feed.properties > >> >Thanks in advance for the help. > >> > > >> > > >> >Renato M. > >> > > >> >2014-11-05 17:06 GMT+01:00 Chris Riccomini > >> > >>><criccom...@linkedin.com.invalid<mailto:criccom...@linkedin.com.invalid > > > >> >>: > >> >> > >> >> Hey Renato, > >> >> > >> >> Could you send the config for your job? > >> >> > >> >> The stack trace is indicating that you are using a system > >>"task.inputs" > >> >> that isn't defined in the configuration. For example, if you have: > >> >> > >> >> task.inputs=kafka.my-topic > >> >> > >> >> But, you haven't defined a "kafka" system (systems.kafka.*) then you > >> >>will > >> >> see this exception. > >> >> > >> >> Cheers, > >> >> Chris > >> >> > >> >> On 11/5/14 6:14 AM, "Renato Marroquín Mogrovejo" > >> >> <renatoj.marroq...@gmail.com<mailto:renatoj.marroq...@gmail.com>> > >> wrote: > >> >> > >> >> >Hi Yang, > >> >> > > >> >> >I tried setting the task.input to have the same key as the system's > >> >>name, > >> >> >but I keep on getting error while trying to run it: > >> >> > > >> >> >2014-11-05 09:42:07 OffsetManager [INFO] Successfully loaded > >>starting > >> >> >offsets: Map(SystemStreamPartition [partition=Partition > >>[partition=0], > >> >> >system=order, stream=order] -> null) > >> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting task instance > >> >>stores. > >> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Initializing stream tasks. > >> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances > >> >> >with producers. > >> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Starting producer > >> >>multiplexer. > >> >> >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances > >> >> >with consumers. > >> >> >2014-11-05 09:42:07 SamzaContainer [ERROR] Caught exception in > >>process > >> >> >loop. > >> >> >org.apache.samza.system.SystemConsumersException: can't register > >> >> >order's consumer. > >> >> > at > >> >> > >> > >>>>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala: > >>>>16 > >> >>4 > >> >> >) > >> >> > at > >> >> > >> > >>>>>org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.ap > >>>>>pl > >> >>>y > >> >> >(TaskInstance.scala:128) > >> >> > at > >> >> > >> > >>>>>org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.ap > >>>>>pl > >> >>>y > >> >> >(TaskInstance.scala:124) > >> >> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > >> >> > at > >> >> > >> > >>>>org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance. > >>>>sc > >> >>a > >> >> >la:124) > >> >> > at > >> >> > >> > >>>>>org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.app > >>>>>ly > >> >>>( > >> >> >SamzaContainer.scala:577) > >> >> > at > >> >> > >> > >>>>>org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.app > >>>>>ly > >> >>>( > >> >> >SamzaContainer.scala:577) > >> >> > at > >>scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >> > at > >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >> > at > >> >> > >> > >>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:2 > >>>>>06 > >> >>>) > >> >> > at > >> >> > >> > >>>>org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer > >>>>.s > >> >>c > >> >> >ala:577) > >> >> > at > >> >> > >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:501) > >> >> > at > >> >> > >> > >>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:8 > >>>>>1) > >> >> > at > >> >org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >> >Caused by: java.util.NoSuchElementException: key not found: order > >> >> > at scala.collection.MapLike$class.default(MapLike.scala:228) > >> >> > at scala.collection.AbstractMap.default(Map.scala:58) > >> >> > at scala.collection.MapLike$class.apply(MapLike.scala:141) > >> >> > at scala.collection.AbstractMap.apply(Map.scala:58) > >> >> > at > >> >> > >> > >>>>org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala: > >>>>16 > >> >>5 > >> >> >) > >> >> > ... 13 more > >> >> > > >> >> > > >> >> > > >> >> >Maybe it has to do with me creating the partitions on the start > >>method > >> >>? > >> >I > >> >> >guess the register method gets called first, then the start and > >>finally > >> >> >the > >> >> >stop? > >> >> >Any suggestions? Thank you very much for your help! > >> >> > > >> >> > > >> >> >Renato M. > >> >> > > >> >> >2014-11-03 18:57 GMT+01:00 Yan Fang <yanfang...@gmail.com<mailto: > >> yanfang...@gmail.com>>: > >> >> > > >> >> >> Hi Renato, > >> >> >> > >> >> >> 1) In the task.inputs option you told me that I should use > >>something > >> >> >>like > >> >> >> "ordersystem.order", I guess I have to use ordersystem because of > >>the > >> >> >> factory name "OrderSystemFactory"? I tried using "order.order" > >>but I > >> >> >>got an > >> >> >> <o.a.s.s.SystemConsumersException: can't register order's > >>consumer> > >> >>So > >> >I > >> >> >> imagine there is a naming convention for classes, inputs, and > >> >factories? > >> >> >> > >> >> >> 1) There is no restriction in the system name. I just used that > >>as > >> >>an > >> >> >> example. As long as the system name in task.inputs is the same as > >>the > >> >> >> system name in systems.%systemname.samza.factory. For example, in > >>the > >> >> >> following properties, the two bold names should be the same. > >> >> >> task.inputs=*foosystem*.tableName, systems.*foosystem* > >> >> >> .samza.factory=samza.examples.wikipedia.system.OrderSystemFactory > >> >> >> > >> >> >> In your case, the task.inputs=*order*.order , because > >>systems.*order* > >> >> >> .samza.factory=samza.examples.order.system.OrderSystemFactory > >> >> >> > >> >> >> 2) I tried using the "ordersystem.order" name but I kept on > >>getting > >> >>the > >> >> >> NoPartitions exception > >> >> >> > >> >> >> 2) The code in your previous register method should be ok. > >> >> >> > >> >> >> 3) The BlockingEnvelopeMap has a put method to put the incoming > >> >messages > >> >> >> into the partition, and I am putting the table values on the start > >> >> >>method, > >> >> >> is this a bad practice? > >> >> >> > >> >> >> 3) It depends. Since the start method is only called once, the > >> >>system > >> >> >> will only read the table once and put all the records into the > >> >> >> BlockingEnvelop. For the testing purpose, I think it is fine. You > >>can > >> >> >>have > >> >> >> a look at our filesystem > >> >> >> < > >> >> >> > >> >> >> > >> > >>> > https://github.com/apache/incubator-samza/blob/master/samza-core/src/mai > >>>n > >> >> > >> > >>>>>>/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.sc > >>>>>>al > >> >>>>a > >> >> >> >, > >> >> >> which uses a thread to monitor the file. > >> >> >> > >> >> >> 4) WikipediaFeedStreamTask takes an envelope object and puts that > >>to > >> >> >>Kafka, > >> >> >> but where does it get the envelope from? the consumer right? > >> >> >> > >> >> >> 4) Yes, all the messages are from the put method in the > >>consumer. > >> >> >> > >> >> >> Thanks, > >> >> >> > >> >> >> Fang, Yan > >> >> >> yanfang...@gmail.com<mailto:yanfang...@gmail.com> > >> >> >> +1 (206) 849-4108<tel:%2B1%20%28206%29%20849-4108> > >> >> >> > >> >> >> On Mon, Nov 3, 2014 at 1:38 AM, Renato Marroquín Mogrovejo < > >> >> >> renatoj.marroq...@gmail.com<mailto:renatoj.marroq...@gmail.com>> > >> wrote: > >> >> >> > >> >> >> > Thanks for the help Yang! > >> >> >> > I think I understand more things now, but I also have more > >> >>questions > >> >> >>:) > >> >> >> > > >> >> >> > 1) In the task.inputs option you told me that I should use > >> >>something > >> >> >>like > >> >> >> > "ordersystem.order", I guess I have to use ordersystem because > >>of > >> >>the > >> >> >> > factory name "OrderSystemFactory"? I tried using "order.order" > >>but > >> >>I > >> >> >>got > >> >> >> an > >> >> >> > <o.a.s.s.SystemConsumersException: can't register order's > >>consumer> > >> >> >>So I > >> >> >> > imagine there is a naming convention for classes, inputs, and > >> >> >>factories? > >> >> >> > 2) I tried using the "ordersystem.order" name but I kept on > >>getting > >> >> >>the > >> >> >> > NoPartitions exception > >> >> >> > 3) The BlockingEnvelopeMap has a put method to put the incoming > >> >> >>messages > >> >> >> > into the partition, and I am putting the table values on the > >>start > >> >> >> method, > >> >> >> > is this a bad practice? > >> >> >> > < > >> >> >> > > >> >> >> > > >> >> >> > >> >> >> > >> > >>> > https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/sr > >>>c > >> >> >>/main/java/samza/examples/order/system/OrderConsumer.java > >> >> >> > > > >> >> >> > 4) WikipediaFeedStreamTask takes an envelope object and puts > >>that > >> >>to > >> >> >> Kafka, > >> >> >> > but where does it get the envelope from? the consumer right? > >> >> >> > > >> >> >> > Thanks again Yang! > >> >> >> > > >> >> >> > > >> >> >> > Renato M. > >> >> >> > > >> >> >> > 2014-11-03 3:18 GMT+01:00 Yan Fang <yanfang...@gmail.com > <mailto: > >> yanfang...@gmail.com>>: > >> >> >> > > >> >> >> > > Hi Renato, > >> >> >> > > > >> >> >> > > on the hello-samza example for each new incoming message which > >> >> >>belongs > >> >> >> > > to a channel, a new partition is created, right? > >> >> >> > > > >> >> >> > > -- In WikipediaFeed job of hello-samza, each channel actually > >>is > >> >> >> treated > >> >> >> > as > >> >> >> > > one stream of the Wikipedia System. Each stream has on > >>partition > >> >>0. > >> >> >> This > >> >> >> > is > >> >> >> > > the code: > >> >> >> > > SystemStreamPartition systemStreamPartition = new > >> >> >> > > SystemStreamPartition(systemName, > >> >> >> > > event.getChannel(), new Partition(0); > >> >> >> > > > >> >> >> > > So in your code, I think each table could be thought as one > >> >>stream, > >> >> >> with > >> >> >> > > only partition 0. So it will be like > >> >> >>task.input=ordersystem.tableName > >> >> >> > > > >> >> >> > > -- Then you maybe confused by what happened in Kafka. All the > >> >> >>messages > >> >> >> > sent > >> >> >> > > to wikipedia-raw topic is done in WikipediaFeedStreamTask > >> >> >> > > < > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> > >> > >>> > https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/sr > >>>c > >> >> > >>>>/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java > >> >> >> > > > > >> >> >> > > . > >> >> >> > > > >> >> >> > > Exception in thread "main" org.apache.samza.SamzaException: No > >> >> >> partitions > >> >> >> > > for this task. Can't run a task without partition assignments. > >> >>It's > >> >> >> > likely > >> >> >> > > that the partition manager for this system doesn't know about > >>the > >> >> >> stream > >> >> >> > > you're trying to read. > >> >> >> > > at > >> >> >> > > >> >> > >> > >>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala: > >>>>>>77 > >> >>>>) > >> >> >> > > at > >> >> > >>>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >> >> > > > >> >> >> > > -- Is the stream registered to the system? > >> >> >> > > SystemStreamPartition systemStreamPartition = new > >> >> >> > > SystemStreamPartition(systemName, > >> >> >> > > "order", new Partition(0); > >> >> >> > > Is the "order" registered ? such as > >>task.input=ordersystem.order > >> >>? > >> >> >> > > > >> >> >> > > Let me know if you have other questions. > >> >> >> > > > >> >> >> > > Thanks, > >> >> >> > > > >> >> >> > > Fang, Yan > >> >> >> > > yanfang...@gmail.com<mailto:yanfang...@gmail.com> > >> >> >> > > +1 (206) 849-4108<tel:%2B1%20%28206%29%20849-4108> > >> >> >> > > > >> >> >> > > On Sun, Nov 2, 2014 at 4:56 AM, Renato Marroquín Mogrovejo < > >> >> >> > > > >>renatoj.marroq...@gmail.com<mailto:renatoj.marroq...@gmail.com>> > >> wrote: > >> >> >> > > > >> >> >> > > > Hello Samza team, > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > I am trying to modify the hello-samza example application to > >> >> >>replay > >> >> >> > > events > >> >> >> > > > which are in a table. But I am having some troubles. > >> >> >> > > > So on the hello-samza example for each new incoming message > >> >>which > >> >> >> > belongs > >> >> >> > > > to a channel, a new partition is created, right? > >> >> >> > > > Now in my case, how (where) do I create these partitions? I > >> >create > >> >> >> them > >> >> >> > > in > >> >> >> > > > [1] but I am almost sure that is wrong because I keep > >>getting > >> >>the > >> >> >> > > exception > >> >> >> > > > saying that there are no partitions for this task. I mean > >> >>ideally > >> >> >>I > >> >> >> > would > >> >> >> > > > like to create partitions based on the keys I am reading > >>from > >> >>the > >> >> >> > table. > >> >> >> > > > Could anybody help me on this task please? Many thanks in > >> >advance! > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > Renato M. > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > Exception in thread "main" org.apache.samza.SamzaException: > >>No > >> >> >> > partitions > >> >> >> > > > for this task. Can't run a task without partition > >>assignments. > >> >> >>It's > >> >> >> > > likely > >> >> >> > > > that the partition manager for this system doesn't know > >>about > >> >>the > >> >> >> > stream > >> >> >> > > > you're trying to read. > >> >> >> > > > at > >> >> >> > > > >> >> >> > >> > >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77) > >> >> >> > > > at > >> >> >> > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > [1] > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> > >> > >>> > https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/sr > >>>c > >> >> >>/main/java/samza/examples/order/system/OrderConsumer.java#L47 > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >> > >> > >