Hey Sonali, In such a case, you probably just want to do stream-level overrides. First, add the JSON serde to your config:
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFacto ry And then forcibly specifying the serde for your output topic(s): systems.kafka.streams.<your output stream>.samza.msg.serde=json Cheers, Chris On 3/19/14 3:21 PM, "[email protected]" <[email protected]> wrote: >One quick question, > >systems.kafka.samza.msg.serde=string, is this property for the output aka >the new kafka topic I'm trying to produce on? > >What happens, if the input message is a String and the output is a JSON? > >Thanks, >Sonali > >-----Original Message----- >From: Yan Fang [mailto:[email protected]] >Sent: Wednesday, March 19, 2014 2:31 PM >To: [email protected] >Subject: Re: Writing my Custom Job > >make sure systems.kafka.samza.msg.serde=string > > >Fang, Yan >[email protected] >+1 (206) 849-4108 > > >On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]> >wrote: > >> Hmm, so I read the input which is a String and want to write to the >> output topic also as a string. So I used the StringSerdeFactory. But >> my samza complains and wants the JsonSerdefactory. >> Exception in thread "main" org.apache.samza.SamzaException: Serde json >> for system kafka does not exist in configuration. >> at >> >>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$ >>anonfun$16.apply(SamzaContainer.scala:185) >> at >> >>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$ >>anonfun$16.apply(SamzaContainer.scala:185) >> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) >> at scala.collection.AbstractMap.getOrElse(Map.scala:58) >> at >> >>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.a >>pply(SamzaContainer.scala:185) >> at >> >>org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.a >>pply(SamzaContainer.scala:183) >> at >> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca >>la:244) >> at >> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca >>la:244) >> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at >> >>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scal >>a:47) >> at scala.collection.SetLike$class.map(SetLike.scala:93) >> at scala.collection.AbstractSet.map(Set.scala:47) >> at >> >>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContaine >>r.scala:183) >> at >> >>org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContaine >>r.scala:180) >> at >> >>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:207 >>) >> at >> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82) >> at >> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> >> Why is that? >> >> -----Original Message----- >> From: Chris Riccomini [mailto:[email protected]] >> Sent: Wednesday, March 19, 2014 12:40 PM >> To: [email protected] >> Subject: Re: Writing my Custom Job >> >> Hey Guys, >> >> This is a transitive dependency from Kafka. My guess is that the Kafka >> pom files published to maven are not quite working properly. We've >> already had some issues with them. Yan's suggestion is an appropriate >> fix. The only thing I'd add is that you can make them a runtime >>dependency. >> >> <scope>runtime</scope> >> >> >> Cheers, >> Chris >> >> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote: >> >> >Hi Sonali, >> > >> >I am not sure how Chris will response. My quick fix for this problem >> >was simple adding the >> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics >> >-c >> >ore >> ></artifactId><version>2.2.0</version></dependency>* >> >to the pom.xml file when compile... >> > >> >Hope that will help. >> > >> >Thanks, >> > >> >Fang, Yan >> >[email protected] >> >+1 (206) 849-4108 >> > >> > >> >On Wed, Mar 19, 2014 at 11:23 AM, >> ><[email protected]>wrote: >> > >> >> So I downloaded and built the latest samza code. I get this error: >> >> Exception in thread "main" java.lang.NoClassDefFoundError: >> >> com/yammer/metrics/Metrics >> >> at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown >> >>Source) >> >> at kafka.producer.ProducerRequestMetrics.newTimer(Unknown >> >>Source) >> >> at kafka.producer.ProducerRequestMetrics.<init>(Unknown >>Source) >> >> at kafka.producer.ProducerRequestStats.<init>(Unknown Source) >> >> at >> >> >> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown >> >>Source) >> >> at >> >> >> >>kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown >> >>Source) >> >> at kafka.utils.Pool.getAndMaybePut(Unknown Source) >> >> at >> >> >> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStats >> >>(U >> >>nkn >> >>own >> >> Source) >> >> at kafka.producer.SyncProducer.<init>(Unknown Source) >> >> at kafka.producer.ProducerPool$.createSyncProducer(Unknown >> >>Source) >> >> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown >>Source) >> >> at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown >>Source) >> >> at >> >> >> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Clie >> >>nt >> >>Uti >> >>lTopicMetadataStore.scala:40) >> >> at >> >> >> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(Kafk >> >>aS >> >>yst >> >>emAdmin.scala:208) >> >> at >> >> >> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kafk >> >>aS >> >>yst >> >>emAdmin.scala:149) >> >> at >> >> >> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(Kafk >> >>aS >> >>yst >> >>emAdmin.scala:149) >> >> at >> >> >> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(T >> >>op >> >>icM >> >>etadataCache.scala:54) >> >> at >> >> >> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetada >> >>ta >> >>(Ka >> >>fkaSystemAdmin.scala:146) >> >> at >> >> >> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetada >> >>ta >> >>(Ka >> >>fkaSystemAdmin.scala:125) >> >> at >> >> >> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply >> >>(U >> >>til >> >>.scala:98) >> >> at >> >> >> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply >> >>(U >> >>til >> >>.scala:86) >> >> at >> >> >> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversabl >> >>eL >> >>ike >> >>.scala:251) >> >> at >> >> >> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Traversabl >> >>eL >> >>ike >> >>.scala:251) >> >> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) >> >> at >> >> >> >>>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:25 >>>>1) >> >> at >> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> >> at >> >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86) >> >> at >> >> >> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppM >> >>as >> >>ter >> >>TaskManager.scala:79) >> >> at >> >> >>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78) >> >> at >> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) >> >> Caused by: java.lang.ClassNotFoundException: >>com.yammer.metrics.Metrics >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> >> at java.security.AccessController.doPrivileged(Native Method) >> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >> >> at >>sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >> >> ... 30 more >> >> >> >> -----Original Message----- >> >> From: Chris Riccomini [mailto:[email protected]] >> >> Sent: Tuesday, March 18, 2014 8:06 PM >> >> To: [email protected] >> >> Subject: Re: Writing my Custom Job >> >> >> >> Hey Sonali, >> >> >> >> Line 65 currently looks like: >> >> >> >> val amClient = new AMRMClientImpl[ContainerRequest] >> >> >> >> There is no call to Option().get, which is what would trigger this >> >> exception. Are you running the latest Samza code from the master >>branch? >> >> >> >> Cheers, >> >> Chris >> >> >> >> On 3/18/14 5:11 PM, "[email protected]" >> >> <[email protected]> wrote: >> >> >> >> >Has this happened to anyone before? >> >> >Exception in thread "main" java.util.NoSuchElementException: >>None.get >> >> > at scala.None$.get(Option.scala:313) >> >> > at scala.None$.get(Option.scala:311) >> >> > at >> >> >>>org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:65) >> >> > at >> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) >> >> > >> >> >-----Original Message----- >> >> >From: Garry Turkington [mailto:[email protected]] >> >> >Sent: Tuesday, March 18, 2014 4:16 PM >> >> >To: [email protected] >> >> >Subject: RE: Writing my Custom Job >> >> > >> >> >Hi, >> >> > >> >> >1. Specifically, since I'm using kafka I don't have to write >> >> >Consumer and Systemfactory Classes? Correct? >> >> >Correct >> >> > >> >> >2. For the SamzaStreamTask would the input be a String? i.e. >> >> > String event = (String)envelope.getMessage(); >> >> > >> >> >Correct -- just make sure you have the serdes on the system/stream >> >> >configured as string. >> >> > >> >> >Garry >> >> > >> >> >-----Original Message----- >> >> >From: [email protected] >> >> >[mailto:[email protected]] >> >> >Sent: 18 March 2014 22:57 >> >> >To: [email protected] >> >> >Subject: RE: Writing my Custom Job >> >> > >> >> >Hey Chris, >> >> > >> >> >Thanks for the quick response! >> >> > >> >> >So the thing is, we have the kafka producer independent of Samza. >> >> >The idea is to test the kafka streams with different CEPs. So one >> >> >example would be Storm. That's why I have a separate kafka job >> >> >running that reads from a file and writes to a kafka topic. >> >> > >> >> >So assuming there is a topic say "input-topic" (where the message >> >> >is some event of type "String" and key is the actual eventId of >> >> >the >> >> >event) already in place, I want to write a SamzaStreamTask that >> >> >will read this string, parse it and write to another kafka topic. >> >> >In other words, Job1 is already done independent of Samza. I'm >> >> >working on >> Job2 using Samza. >> >> > >> >> >1. Specifically, since I'm using kafka I don't have to write >> >> >Consumer and Systemfactory Classes? Correct? >> >> >2. For the SamzaStreamTask would the input be a String? i.e. >> >> > String event = (String)envelope.getMessage(); >> >> > >> >> >Thanks! >> >> >Sonali >> >> > >> >> >-----Original Message----- >> >> >From: Chris Riccomini [mailto:[email protected]] >> >> >Sent: Tuesday, March 18, 2014 3:16 PM >> >> >To: [email protected] >> >> >Subject: Re: Writing my Custom Job >> >> > >> >> >Hey Sonali, >> >> > >> >> >1. For CSV file reading, you should check this JIRA out: >> >> > >> >> > https://issues.apache.org/jira/browse/SAMZA-138 >> >> > >> >> >2. You don't need to write to a Kafka topic using the standard >> >> >Kafka producer. You can use the collector that comes as part of >> >> >the process method. Take a look at one of the hello-samza examples >> >> >to see how this is done. (collector.send(...)) >> >> > >> >> >3. To parse the string, retrieve specific fields, etc, you should >> >> >write a second StreamTask that reads from the first. The flow >> >> >should look >> >>like: >> >> > >> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2 >> >> > >> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by >> >> >event ID, and "Job 2" parses and retrieves specific fields, and >> >> >produces to "Kafka topic 2". >> >> > >> >> >Cheers, >> >> >Chris >> >> > >> >> >On 3/18/14 2:48 PM, "[email protected]" >> >> ><[email protected]> wrote: >> >> > >> >> >>Hey Guys, >> >> >> >> >> >>So I'm writing my custom job in Samza and wanted to make sure I'm >> >> >>not re-inventing the wheel. >> >> >> >> >> >>I have a kafka job running that reads from a csv file and writes >> >> >>to a topic. I wrote this using the kafka producer api independent >> >> >>of >> Samza. >> >> >>The output is a KeyedMessage with key being my eventId and the >> >> >>value is a string corresponding to my event. >> >> >> >> >> >>Now, I want to write a SamzaConsumer that listens on my topic, >> >> >>parses the string to retrieve specific fields I'm interested in >> >> >>and write it out to a different kafka topic. >> >> >> >> >> >>Are there existing classes I can leverage to do this? >> >> >> >> >> >>Thanks, >> >> >>Sonali >> >> >> >> >> >>Sonali Parthasarathy >> >> >>R&D Developer, Data Insights >> >> >>Accenture Technology Labs >> >> >>703-341-7432 >> >> >> >> >> >> >> >> >>________________________________ >> >> >> >> >> >>This message is for the designated recipient only and may contain >> >> >>privileged, proprietary, or otherwise confidential information. >> >> >>If you have received it in error, please notify the sender >> >> >>immediately and delete the original. Any other use of the e-mail >> >> >>by you is >> prohibited. >> >> >>Where allowed by local law, electronic communications with >> >> >>Accenture and its affiliates, including e-mail and instant >> >> >>messaging (including content), may be scanned by our systems for >> >> >>the purposes of information security and assessment of internal >> >> >>compliance with >> >> Accenture policy. >> >> >>_________________________________________________________________ >> >> >>__ >> >> >>___ >> >> >>_ >> >> >>___ >> >> >>____________ >> >> >> >> >> >>www.accenture.com >> >> > >> >> > >> >> > >> >> >________________________________ >> >> > >> >> >This message is for the designated recipient only and may contain >> >> >privileged, proprietary, or otherwise confidential information. If >> >> >you have received it in error, please notify the sender >> >> >immediately and delete the original. Any other use of the e-mail >> >> >by you is >> prohibited. >> >> >Where allowed by local law, electronic communications with >> >> >Accenture and its affiliates, including e-mail and instant >> >> >messaging (including content), may be scanned by our systems for >> >> >the purposes of information security and assessment of internal >> >> >compliance with >> Accenture policy. >> >> >__________________________________________________________________ >> >> >__ >> >> >___ >> >> >___ >> >> >____________ >> >> > >> >> >www.accenture.com >> >> > >> >> > >> >> >----- >> >> >No virus found in this message. >> >> >Checked by AVG - www.avg.com >> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date: >> >> >03/18/14 >> >> > >> >> > >> >> >> >> >> >> >> >> ________________________________ >> >> >> >> This message is for the designated recipient only and may contain >> >>privileged, proprietary, or otherwise confidential information. If >> >>you have received it in error, please notify the sender immediately >> >>and delete the original. Any other use of the e-mail by you is >> >>prohibited. Where allowed by local law, electronic communications >> >>with Accenture and its affiliates, including e-mail and instant >> >>messaging (including content), may be scanned by our systems for >> >>the purposes of information security and assessment of internal >> >>compliance with Accenture policy. >> >> >> >> >> >>____________________________________________________________________ >> >>__ >> >>___ >> >>_____________ >> >> >> >> www.accenture.com >> >> >> >> >> >> >> >> ________________________________ >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you >> have received it in error, please notify the sender immediately and >> delete the original. Any other use of the e-mail by you is prohibited. >> Where allowed by local law, electronic communications with Accenture >> and its affiliates, including e-mail and instant messaging (including >> content), may be scanned by our systems for the purposes of >> information security and assessment of internal compliance with >>Accenture policy. >> >> ______________________________________________________________________ >> ________________ >> >> www.accenture.com >> >> > >________________________________ > >This message is for the designated recipient only and may contain >privileged, proprietary, or otherwise confidential information. If you >have received it in error, please notify the sender immediately and >delete the original. Any other use of the e-mail by you is prohibited. >Where allowed by local law, electronic communications with Accenture and >its affiliates, including e-mail and instant messaging (including >content), may be scanned by our systems for the purposes of information >security and assessment of internal compliance with Accenture policy. >__________________________________________________________________________ >____________ > >www.accenture.com
