make sure systems.kafka.samza.msg.serde=string
Fang, Yan [email protected] +1 (206) 849-4108 On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]> wrote: > Hmm, so I read the input which is a String and want to write to the output > topic also as a string. So I used the StringSerdeFactory. But my samza > complains and wants the JsonSerdefactory. > Exception in thread "main" org.apache.samza.SamzaException: Serde json for > system kafka does not exist in configuration. > at > org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$anonfun$16.apply(SamzaContainer.scala:185) > at > org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$anonfun$16.apply(SamzaContainer.scala:185) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > at > org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.apply(SamzaContainer.scala:185) > at > org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.apply(SamzaContainer.scala:183) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) > at scala.collection.SetLike$class.map(SetLike.scala:93) > at scala.collection.AbstractSet.map(Set.scala:47) > at > org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContainer.scala:183) > at > org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContainer.scala:180) > at > org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:207) > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82) > at > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > Why is that? > > -----Original Message----- > From: Chris Riccomini [mailto:[email protected]] > Sent: Wednesday, March 19, 2014 12:40 PM > To: [email protected] > Subject: Re: Writing my Custom Job > > Hey Guys, > > This is a transitive dependency from Kafka. My guess is that the Kafka pom > files published to maven are not quite working properly. We've already had > some issues with them. Yan's suggestion is an appropriate fix. The only > thing I'd add is that you can make them a runtime dependency. > > <scope>runtime</scope> > > > Cheers, > Chris > > On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote: > > >Hi Sonali, > > > >I am not sure how Chris will response. My quick fix for this problem > >was simple adding the > >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-c > >ore > ></artifactId><version>2.2.0</version></dependency>* > >to the pom.xml file when compile... > > > >Hope that will help. > > > >Thanks, > > > >Fang, Yan > >[email protected] > >+1 (206) 849-4108 > > > > > >On Wed, Mar 19, 2014 at 11:23 AM, > ><[email protected]>wrote: > > > >> So I downloaded and built the latest samza code. I get this error: > >> Exception in thread "main" java.lang.NoClassDefFoundError: > >> com/yammer/metrics/Metrics > >> at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown > >>Source) > >> at kafka.producer.ProducerRequestMetrics.newTimer(Unknown > >>Source) > >> at kafka.producer.ProducerRequestMetrics.<init>(Unknown Source) > >> at kafka.producer.ProducerRequestStats.<init>(Unknown Source) > >> at > >> kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown > >>Source) > >> at > >> kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown > >>Source) > >> at kafka.utils.Pool.getAndMaybePut(Unknown Source) > >> at > >> > >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStats(U > >>nkn > >>own > >> Source) > >> at kafka.producer.SyncProducer.<init>(Unknown Source) > >> at kafka.producer.ProducerPool$.createSyncProducer(Unknown > >>Source) > >> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source) > >> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source) > >> at > >> > >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Client > >>Uti > >>lTopicMetadataStore.scala:40) > >> at > >> > >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaS > >>yst > >>emAdmin.scala:208) > >> at > >> > >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaS > >>yst > >>emAdmin.scala:149) > >> at > >> > >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaS > >>yst > >>emAdmin.scala:149) > >> at > >> > >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(Top > >>icM > >>etadataCache.scala:54) > >> at > >> > >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata > >>(Ka > >>fkaSystemAdmin.scala:146) > >> at > >> > >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata > >>(Ka > >>fkaSystemAdmin.scala:125) > >> at > >> > >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(U > >>til > >>.scala:98) > >> at > >> > >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(U > >>til > >>.scala:86) > >> at > >> > >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableL > >>ike > >>.scala:251) > >> at > >> > >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableL > >>ike > >>.scala:251) > >> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > >> at > >> > >>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > >> at > >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > >> at > >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86) > >> at > >> > >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMas > >>ter > >>TaskManager.scala:79) > >> at > >> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78) > >> at > >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) > >> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > >> ... 30 more > >> > >> -----Original Message----- > >> From: Chris Riccomini [mailto:[email protected]] > >> Sent: Tuesday, March 18, 2014 8:06 PM > >> To: [email protected] > >> Subject: Re: Writing my Custom Job > >> > >> Hey Sonali, > >> > >> Line 65 currently looks like: > >> > >> val amClient = new AMRMClientImpl[ContainerRequest] > >> > >> There is no call to Option().get, which is what would trigger this > >> exception. Are you running the latest Samza code from the master branch? > >> > >> Cheers, > >> Chris > >> > >> On 3/18/14 5:11 PM, "[email protected]" > >> <[email protected]> wrote: > >> > >> >Has this happened to anyone before? > >> >Exception in thread "main" java.util.NoSuchElementException: None.get > >> > at scala.None$.get(Option.scala:313) > >> > at scala.None$.get(Option.scala:311) > >> > at > >> >org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:65) > >> > at > >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) > >> > > >> >-----Original Message----- > >> >From: Garry Turkington [mailto:[email protected]] > >> >Sent: Tuesday, March 18, 2014 4:16 PM > >> >To: [email protected] > >> >Subject: RE: Writing my Custom Job > >> > > >> >Hi, > >> > > >> >1. Specifically, since I'm using kafka I don't have to write > >> >Consumer and Systemfactory Classes? Correct? > >> >Correct > >> > > >> >2. For the SamzaStreamTask would the input be a String? i.e. > >> > String event = (String)envelope.getMessage(); > >> > > >> >Correct -- just make sure you have the serdes on the system/stream > >> >configured as string. > >> > > >> >Garry > >> > > >> >-----Original Message----- > >> >From: [email protected] > >> >[mailto:[email protected]] > >> >Sent: 18 March 2014 22:57 > >> >To: [email protected] > >> >Subject: RE: Writing my Custom Job > >> > > >> >Hey Chris, > >> > > >> >Thanks for the quick response! > >> > > >> >So the thing is, we have the kafka producer independent of Samza. > >> >The idea is to test the kafka streams with different CEPs. So one > >> >example would be Storm. That's why I have a separate kafka job > >> >running that reads from a file and writes to a kafka topic. > >> > > >> >So assuming there is a topic say "input-topic" (where the message is > >> >some event of type "String" and key is the actual eventId of the > >> >event) already in place, I want to write a SamzaStreamTask that will > >> >read this string, parse it and write to another kafka topic. In > >> >other words, Job1 is already done independent of Samza. I'm working on > Job2 using Samza. > >> > > >> >1. Specifically, since I'm using kafka I don't have to write > >> >Consumer and Systemfactory Classes? Correct? > >> >2. For the SamzaStreamTask would the input be a String? i.e. > >> > String event = (String)envelope.getMessage(); > >> > > >> >Thanks! > >> >Sonali > >> > > >> >-----Original Message----- > >> >From: Chris Riccomini [mailto:[email protected]] > >> >Sent: Tuesday, March 18, 2014 3:16 PM > >> >To: [email protected] > >> >Subject: Re: Writing my Custom Job > >> > > >> >Hey Sonali, > >> > > >> >1. For CSV file reading, you should check this JIRA out: > >> > > >> > https://issues.apache.org/jira/browse/SAMZA-138 > >> > > >> >2. You don't need to write to a Kafka topic using the standard Kafka > >> >producer. You can use the collector that comes as part of the > >> >process method. Take a look at one of the hello-samza examples to > >> >see how this is done. (collector.send(...)) > >> > > >> >3. To parse the string, retrieve specific fields, etc, you should > >> >write a second StreamTask that reads from the first. The flow should > >> >look > >>like: > >> > > >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2 > >> > > >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by event > >> >ID, and "Job 2" parses and retrieves specific fields, and produces > >> >to "Kafka topic 2". > >> > > >> >Cheers, > >> >Chris > >> > > >> >On 3/18/14 2:48 PM, "[email protected]" > >> ><[email protected]> wrote: > >> > > >> >>Hey Guys, > >> >> > >> >>So I'm writing my custom job in Samza and wanted to make sure I'm > >> >>not re-inventing the wheel. > >> >> > >> >>I have a kafka job running that reads from a csv file and writes to > >> >>a topic. I wrote this using the kafka producer api independent of > Samza. > >> >>The output is a KeyedMessage with key being my eventId and the > >> >>value is a string corresponding to my event. > >> >> > >> >>Now, I want to write a SamzaConsumer that listens on my topic, > >> >>parses the string to retrieve specific fields I'm interested in and > >> >>write it out to a different kafka topic. > >> >> > >> >>Are there existing classes I can leverage to do this? > >> >> > >> >>Thanks, > >> >>Sonali > >> >> > >> >>Sonali Parthasarathy > >> >>R&D Developer, Data Insights > >> >>Accenture Technology Labs > >> >>703-341-7432 > >> >> > >> >> > >> >>________________________________ > >> >> > >> >>This message is for the designated recipient only and may contain > >> >>privileged, proprietary, or otherwise confidential information. If > >> >>you have received it in error, please notify the sender immediately > >> >>and delete the original. Any other use of the e-mail by you is > prohibited. > >> >>Where allowed by local law, electronic communications with > >> >>Accenture and its affiliates, including e-mail and instant > >> >>messaging (including content), may be scanned by our systems for > >> >>the purposes of information security and assessment of internal > >> >>compliance with > >> Accenture policy. > >> >>___________________________________________________________________ > >> >>___ > >> >>_ > >> >>___ > >> >>____________ > >> >> > >> >>www.accenture.com > >> > > >> > > >> > > >> >________________________________ > >> > > >> >This message is for the designated recipient only and may contain > >> >privileged, proprietary, or otherwise confidential information. If > >> >you have received it in error, please notify the sender immediately > >> >and delete the original. Any other use of the e-mail by you is > prohibited. > >> >Where allowed by local law, electronic communications with Accenture > >> >and its affiliates, including e-mail and instant messaging > >> >(including content), may be scanned by our systems for the purposes > >> >of information security and assessment of internal compliance with > Accenture policy. > >> >____________________________________________________________________ > >> >___ > >> >___ > >> >____________ > >> > > >> >www.accenture.com > >> > > >> > > >> >----- > >> >No virus found in this message. > >> >Checked by AVG - www.avg.com > >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date: > >> >03/18/14 > >> > > >> > > >> > >> > >> > >> ________________________________ > >> > >> This message is for the designated recipient only and may contain > >>privileged, proprietary, or otherwise confidential information. If you > >>have received it in error, please notify the sender immediately and > >>delete the original. Any other use of the e-mail by you is > >>prohibited. Where allowed by local law, electronic communications > >>with Accenture and its affiliates, including e-mail and instant > >>messaging (including content), may be scanned by our systems for the > >>purposes of information security and assessment of internal > >>compliance with Accenture policy. > >> > >> > >>______________________________________________________________________ > >>___ > >>_____________ > >> > >> www.accenture.com > >> > >> > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Where allowed > by local law, electronic communications with Accenture and its affiliates, > including e-mail and instant messaging (including content), may be scanned > by our systems for the purposes of information security and assessment of > internal compliance with Accenture policy. > > ______________________________________________________________________________________ > > www.accenture.com > >
