make sure systems.kafka.samza.msg.serde=string

Fang, Yan
[email protected]
+1 (206) 849-4108


On Wed, Mar 19, 2014 at 2:21 PM, <[email protected]> wrote:

> Hmm, so I read the input which is a String and want to write to the output
> topic also as a string. So I used the StringSerdeFactory. But my samza
> complains and wants the JsonSerdefactory.
> Exception in thread "main" org.apache.samza.SamzaException: Serde json for
> system kafka does not exist in configuration.
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$anonfun$16.apply(SamzaContainer.scala:185)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17$$anonfun$16.apply(SamzaContainer.scala:185)
>         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.apply(SamzaContainer.scala:185)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$15$$anonfun$apply$17.apply(SamzaContainer.scala:183)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
>         at scala.collection.SetLike$class.map(SetLike.scala:93)
>         at scala.collection.AbstractSet.map(Set.scala:47)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContainer.scala:183)
>         at
> org.apache.samza.container.SamzaContainer$$anonfun$15.apply(SamzaContainer.scala:180)
>         at
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:207)
>         at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>         at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> Why is that?
>
> -----Original Message-----
> From: Chris Riccomini [mailto:[email protected]]
> Sent: Wednesday, March 19, 2014 12:40 PM
> To: [email protected]
> Subject: Re: Writing my Custom Job
>
> Hey Guys,
>
> This is a transitive dependency from Kafka. My guess is that the Kafka pom
> files published to maven are not quite working properly. We've already had
> some issues with them. Yan's suggestion is an appropriate fix. The only
> thing I'd add is that you can make them a runtime dependency.
>
>       <scope>runtime</scope>
>
>
> Cheers,
> Chris
>
> On 3/19/14 12:32 PM, "Yan Fang" <[email protected]> wrote:
>
> >Hi Sonali,
> >
> >I am not sure how Chris will response. My quick fix for this problem
> >was simple adding the
> >*<dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-c
> >ore
> ></artifactId><version>2.2.0</version></dependency>*
> >to the pom.xml file when compile...
> >
> >Hope that will help.
> >
> >Thanks,
> >
> >Fang, Yan
> >[email protected]
> >+1 (206) 849-4108
> >
> >
> >On Wed, Mar 19, 2014 at 11:23 AM,
> ><[email protected]>wrote:
> >
> >> So I downloaded and built the latest samza code. I get this error:
> >> Exception in thread "main" java.lang.NoClassDefFoundError:
> >> com/yammer/metrics/Metrics
> >>         at kafka.metrics.KafkaMetricsGroup$class.newTimer(Unknown
> >>Source)
> >>         at kafka.producer.ProducerRequestMetrics.newTimer(Unknown
> >>Source)
> >>         at kafka.producer.ProducerRequestMetrics.<init>(Unknown Source)
> >>         at kafka.producer.ProducerRequestStats.<init>(Unknown Source)
> >>         at
> >> kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown
> >>Source)
> >>         at
> >> kafka.producer.ProducerRequestStatsRegistry$$anonfun$2.apply(Unknown
> >>Source)
> >>         at kafka.utils.Pool.getAndMaybePut(Unknown Source)
> >>         at
> >>
> >>kafka.producer.ProducerRequestStatsRegistry$.getProducerRequestStats(U
> >>nkn
> >>own
> >> Source)
> >>         at kafka.producer.SyncProducer.<init>(Unknown Source)
> >>         at kafka.producer.ProducerPool$.createSyncProducer(Unknown
> >>Source)
> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source)
> >>         at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source)
> >>         at
> >>
> >>org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(Client
> >>Uti
> >>lTopicMetadataStore.scala:40)
> >>         at
> >>
> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaS
> >>yst
> >>emAdmin.scala:208)
> >>         at
> >>
> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaS
> >>yst
> >>emAdmin.scala:149)
> >>         at
> >>
> >>org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$6.apply(KafkaS
> >>yst
> >>emAdmin.scala:149)
> >>         at
> >>
> >>org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(Top
> >>icM
> >>etadataCache.scala:54)
> >>         at
> >>
> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata
> >>(Ka
> >>fkaSystemAdmin.scala:146)
> >>         at
> >>
> >>org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata
> >>(Ka
> >>fkaSystemAdmin.scala:125)
> >>         at
> >>
> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(U
> >>til
> >>.scala:98)
> >>         at
> >>
> >>org.apache.samza.util.Util$$anonfun$getInputStreamPartitions$1.apply(U
> >>til
> >>.scala:86)
> >>         at
> >>
> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableL
> >>ike
> >>.scala:251)
> >>         at
> >>
> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableL
> >>ike
> >>.scala:251)
> >>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> >>         at
> >>
> >>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> >>         at
> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >>         at
> >> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:86)
> >>         at
> >>
> >>org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMas
> >>ter
> >>TaskManager.scala:79)
> >>         at
> >> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:78)
> >>         at
> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> >> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >>         at java.security.AccessController.doPrivileged(Native Method)
> >>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> >>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> >>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >>         ... 30 more
> >>
> >> -----Original Message-----
> >> From: Chris Riccomini [mailto:[email protected]]
> >> Sent: Tuesday, March 18, 2014 8:06 PM
> >> To: [email protected]
> >> Subject: Re: Writing my Custom Job
> >>
> >> Hey Sonali,
> >>
> >> Line 65 currently looks like:
> >>
> >>   val amClient = new AMRMClientImpl[ContainerRequest]
> >>
> >> There is no call to Option().get, which is what would trigger this
> >> exception. Are you running the latest Samza code from the master branch?
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 3/18/14 5:11 PM, "[email protected]"
> >> <[email protected]> wrote:
> >>
> >> >Has this happened to anyone before?
> >> >Exception in thread "main" java.util.NoSuchElementException: None.get
> >> >       at scala.None$.get(Option.scala:313)
> >> >       at scala.None$.get(Option.scala:311)
> >> >       at
> >> >org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:65)
> >> >       at
> >> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> >> >
> >> >-----Original Message-----
> >> >From: Garry Turkington [mailto:[email protected]]
> >> >Sent: Tuesday, March 18, 2014 4:16 PM
> >> >To: [email protected]
> >> >Subject: RE: Writing my Custom Job
> >> >
> >> >Hi,
> >> >
> >> >1. Specifically, since I'm using kafka I don't have to write
> >> >Consumer and Systemfactory Classes? Correct?
> >> >Correct
> >> >
> >> >2. For the SamzaStreamTask would the input be a String? i.e.
> >> >    String event = (String)envelope.getMessage();
> >> >
> >> >Correct -- just make sure you have the serdes on the system/stream
> >> >configured as string.
> >> >
> >> >Garry
> >> >
> >> >-----Original Message-----
> >> >From: [email protected]
> >> >[mailto:[email protected]]
> >> >Sent: 18 March 2014 22:57
> >> >To: [email protected]
> >> >Subject: RE: Writing my Custom Job
> >> >
> >> >Hey Chris,
> >> >
> >> >Thanks for the quick response!
> >> >
> >> >So the thing is, we have the kafka producer independent of Samza.
> >> >The idea is to test the kafka streams with different CEPs. So one
> >> >example would be Storm. That's why I have a separate kafka job
> >> >running that reads from a file and writes to a kafka topic.
> >> >
> >> >So assuming there is a topic say "input-topic" (where the message is
> >> >some event of type "String" and key is the actual eventId of the
> >> >event) already in place, I want to write a SamzaStreamTask that will
> >> >read this string, parse it and write to another kafka topic. In
> >> >other words, Job1 is already done independent of Samza. I'm working on
> Job2 using Samza.
> >> >
> >> >1. Specifically, since I'm using kafka I don't have to write
> >> >Consumer and Systemfactory Classes? Correct?
> >> >2. For the SamzaStreamTask would the input be a String? i.e.
> >> >    String event = (String)envelope.getMessage();
> >> >
> >> >Thanks!
> >> >Sonali
> >> >
> >> >-----Original Message-----
> >> >From: Chris Riccomini [mailto:[email protected]]
> >> >Sent: Tuesday, March 18, 2014 3:16 PM
> >> >To: [email protected]
> >> >Subject: Re: Writing my Custom Job
> >> >
> >> >Hey Sonali,
> >> >
> >> >1. For CSV file reading, you should check this JIRA out:
> >> >
> >> >  https://issues.apache.org/jira/browse/SAMZA-138
> >> >
> >> >2. You don't need to write to a Kafka topic using the standard Kafka
> >> >producer. You can use the collector that comes as part of the
> >> >process method. Take a look at one of the hello-samza examples to
> >> >see how this is done. (collector.send(...))
> >> >
> >> >3. To parse the string, retrieve specific fields, etc, you should
> >> >write a second StreamTask that reads from the first. The flow should
> >> >look
> >>like:
> >> >
> >> ><file> -> Job 1 -> Kafka topic 1 -> Job 2 -> Kafka topic 2
> >> >
> >> >Where "Job 1" sends messages to "Kafka topic 1" partitioned by event
> >> >ID, and "Job 2" parses and retrieves specific fields, and produces
> >> >to "Kafka topic 2".
> >> >
> >> >Cheers,
> >> >Chris
> >> >
> >> >On 3/18/14 2:48 PM, "[email protected]"
> >> ><[email protected]> wrote:
> >> >
> >> >>Hey Guys,
> >> >>
> >> >>So I'm writing my custom job in Samza and wanted to make sure I'm
> >> >>not re-inventing the wheel.
> >> >>
> >> >>I have a kafka job running that reads from a csv file and writes to
> >> >>a topic. I wrote this using the kafka producer api independent of
> Samza.
> >> >>The output is a KeyedMessage with key being my eventId and the
> >> >>value is a string corresponding to my event.
> >> >>
> >> >>Now, I want to write a SamzaConsumer that listens on my topic,
> >> >>parses the string to retrieve specific fields I'm interested in and
> >> >>write it out to a different kafka topic.
> >> >>
> >> >>Are there existing classes I can leverage to do this?
> >> >>
> >> >>Thanks,
> >> >>Sonali
> >> >>
> >> >>Sonali Parthasarathy
> >> >>R&D Developer, Data Insights
> >> >>Accenture Technology Labs
> >> >>703-341-7432
> >> >>
> >> >>
> >> >>________________________________
> >> >>
> >> >>This message is for the designated recipient only and may contain
> >> >>privileged, proprietary, or otherwise confidential information. If
> >> >>you have received it in error, please notify the sender immediately
> >> >>and delete the original. Any other use of the e-mail by you is
> prohibited.
> >> >>Where allowed by local law, electronic communications with
> >> >>Accenture and its affiliates, including e-mail and instant
> >> >>messaging (including content), may be scanned by our systems for
> >> >>the purposes of information security and assessment of internal
> >> >>compliance with
> >> Accenture policy.
> >> >>___________________________________________________________________
> >> >>___
> >> >>_
> >> >>___
> >> >>____________
> >> >>
> >> >>www.accenture.com
> >> >
> >> >
> >> >
> >> >________________________________
> >> >
> >> >This message is for the designated recipient only and may contain
> >> >privileged, proprietary, or otherwise confidential information. If
> >> >you have received it in error, please notify the sender immediately
> >> >and delete the original. Any other use of the e-mail by you is
> prohibited.
> >> >Where allowed by local law, electronic communications with Accenture
> >> >and its affiliates, including e-mail and instant messaging
> >> >(including content), may be scanned by our systems for the purposes
> >> >of information security and assessment of internal compliance with
> Accenture policy.
> >> >____________________________________________________________________
> >> >___
> >> >___
> >> >____________
> >> >
> >> >www.accenture.com
> >> >
> >> >
> >> >-----
> >> >No virus found in this message.
> >> >Checked by AVG - www.avg.com
> >> >Version: 2014.0.4259 / Virus Database: 3722/7211 - Release Date:
> >> >03/18/14
> >> >
> >> >
> >>
> >>
> >>
> >> ________________________________
> >>
> >> This message is for the designated recipient only and may contain
> >>privileged, proprietary, or otherwise confidential information. If you
> >>have  received it in error, please notify the sender immediately and
> >>delete the  original. Any other use of the e-mail by you is
> >>prohibited. Where allowed  by local law, electronic communications
> >>with Accenture and its affiliates,  including e-mail and instant
> >>messaging (including content), may be scanned  by our systems for the
> >>purposes of information security and assessment of  internal
> >>compliance with Accenture policy.
> >>
> >>
> >>______________________________________________________________________
> >>___
> >>_____________
> >>
> >> www.accenture.com
> >>
> >>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>

Reply via email to