Hey Telles,

The code you've posted in Produce.java shows:

            KeyedMessage<String, String> data = new KeyedMessage<String,
String>("consumptions", String.valueOf(key),String.valueOf(value));


Which suggests that you are sending a string for both key and value. If
you have a Samza task consuming from this topic, you should set:

systems.system-name.samza.key.serde=string

systems.system-name.samza.msg.serde=string


Cheers,
Chris

On 8/7/14 11:52 AM, "Telles Nobrega" <[email protected]> wrote:

>Hum, that sounds like a perfect reason for it.
>
>I'm writing to the topic with this code
>
>https://github.com/tellesnobrega/kafka-producer/blob/master/src/Produce.ja
>va
>
>My problem is that I need to send numbers as key and value to the kafka
>topic so i can read it in samza.
>
>What is the best way to de/serialize this?
>
>
>On Thu, Aug 7, 2014 at 3:43 PM, Yan Fang <[email protected]> wrote:
>
>> Hi Telles,
>>
>> One of the possible reasons is that, in your process method, you are
>>trying
>> to send a HashMap, not a String, in the collection.send. Could you
>>check it
>> ?
>>
>> Thanks,
>>
>> Fang, Yan
>> [email protected]
>> +1 (206) 849-4108
>>
>>
>> On Thu, Aug 7, 2014 at 11:25 AM, Telles Nobrega
>><[email protected]>
>> wrote:
>>
>> > I changed my properties a little to look like this: link
>> > <
>> >
>> 
>>http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%3CEA1B8C3
>>[email protected]%3E
>> > >
>> >
>> > here it goes:
>> >
>> > # Job
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=consumptions
>> >
>> > # YARN
>> >
>> >
>> 
>>yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.ve
>>rsion}-dist.tar.gz
>> >
>> > # Task
>> > task.class=alarm.ConsumptionProducer
>> > task.inputs=kafka.consumptions
>> >
>> >
>> 
>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint
>>ManagerFactory
>> > task.checkpoint.system=kafka
>> > # Normally, this would be 3, but we have only one broker.
>> > task.checkpoint.replication.factor=1
>> >
>> > # Metrics
>> > metrics.reporters=snapshot,jmx
>> >
>> >
>> 
>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metrics
>>SnapshotReporterFactory
>> > metrics.reporter.snapshot.stream=kafka.metrics
>> >
>> >
>> 
>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterF
>>actory
>> >
>> > # Serializers
>> >
>> >
>> 
>>serializers.registry.string.class=org.apache.samza.serializers.StringSerd
>>eFactory
>> >
>> >
>> 
>>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSn
>>apshotSerdeFactory
>> >
>> > # Kafka System
>> >
>> >
>> 
>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFact
>>ory
>> > systems.kafka.samza.msg.serde=string
>> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> > systems.kafka.consumer.auto.offset.reset=largest
>> > systems.kafka.producer.metadata.broker.list=localhost:9092
>> > systems.kafka.producer.producer.type=sync
>> > # Normally, we'd set this much higher, but we want things to look
>>snappy
>> in
>> > the demo.
>> > systems.kafka.producer.batch.num.messages=1
>> > ystems.kafka.streams.metrics.samza.msg.serde=metrics
>> >
>> > But I'm getting this output,
>> >
>> > 0Caught exception in process loop.
>> > java.lang.ClassCastException: java.util.HashMap cannot be cast to
>> > java.lang.String
>> >  at
>> org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33)
>> > at
>> org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69)
>> >  at
>> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65)
>> > at
>> >
>> >
>> 
>>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstanc
>>e.scala:170)
>> >  at
>> >
>> >
>> 
>>org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstanc
>>e.scala:170)
>> > at
>> >
>> >
>> 
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> >  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at 
>>org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170)
>> >  at
>> >
>> 
>>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:11
>>6)
>> > at
>> >
>> 
>>org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:11
>>6)
>> >  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >  at
>> > 
>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>> > at org.apache.samza.container.RunLoop.send(RunLoop.scala:116)
>> >  at org.apache.samza.container.RunLoop.run(RunLoop.scala:59)
>> > at
>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
>> >  at
>> > 
>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
>> > at 
>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >
>> >
>> >
>> > On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega
>><[email protected]>
>> > wrote:
>> >
>> > > Hi Chris, I really appreciate the time you are taking to help me
>>out.
>> > >
>> > > This is job.properties file
>> > >
>> > > # Job
>> > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > > job.name=consumptions
>> > >
>> > > # YARN
>> > >
>> > >
>> >
>> 
>>yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.ve
>>rsion}-dist.tar.gz
>> > >
>> > > # Task
>> > > task.class=alarm.ConsumptionProducer
>> > > task.inputs=kafka.consumptions
>> > >
>> > >
>> >
>> 
>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoint
>>ManagerFactory
>> > > task.checkpoint.system=kafka
>> > > # Normally, this would be 3, but we have only one broker.
>> > > task.checkpoint.replication.factor=1
>> > >
>> > > # Serializers
>> > >
>> > >
>> >
>> 
>>serializers.registry.serde.class=org.apache.samza.serializers.StringSerde
>>Factory
>> > >
>> > > # Kafka System
>> > >
>> > >
>> >
>> 
>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFact
>>ory
>> > > *systems.kafka.samza.msg.serde=json*
>> > > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> > > systems.kafka.consumer.auto.offset.reset=largest
>> > > systems.kafka.producer.metadata.broker.list=localhost:9092
>> > > systems.kafka.producer.producer.type=sync
>> > > # Normally, we'd set this much higher, but we want things to look
>> snappy
>> > > in the demo.
>> > > systems.kafka.producer.batch.num.messages=1
>> > >
>> > > *systems.kafka.streams.consumptions.key.serde=string*
>> > > *systems.kafka.streams.consumptions.msg.serde=string*
>> > >
>> > > Does this look right?
>> > > I'm running a local cluster, I want to have it running nicely
>>before I
>> > can
>> > > distribute it.
>> > >
>> > >
>> > >
>> > > On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini <
>> > > [email protected]> wrote:
>> > >
>> > >> Hey Telles,
>> > >>
>> > >> Sure. In your job.properties file, define the serde:
>> > >>
>> > >> # Serializers
>> > >>
>> > >>
>> >
>> 
>>serializers.registry.serde.class=org.apache.samza.serializers.StringSerde
>>Fa
>> > >> ctory
>> > >>
>> > >>
>> > >> Then define the serde for your system:
>> > >>
>> > >> systems.kafka.samza.msg.serde=string
>> > >>
>> > >>
>> > >> Cheers,
>> > >> Chris
>> > >>
>> > >> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]>
>>wrote:
>> > >>
>> > >> >Can you give and example on how to use string serde, i'm getting
>>an
>> > error
>> > >> >when trying to set to string
>> > >> >
>> > >> >:53:26:804Got system producers: Set(kafka)
>> > >> >17:53:26:809Got serdes: Set(string)
>> > >> >17:53:29:206Container container_1407433587782_0001_01_000017
>>failed
>> > with
>> > >> >exit code 1 - Exception from container-launch:
>> > >> >
>> > >> >
>> > >> >
>> > >> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega <
>> > [email protected]>
>> > >> >wrote:
>> > >> >
>> > >> >> Thanks.
>> > >> >>
>> > >> >>
>> > >> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini <
>> > >> >> [email protected]> wrote:
>> > >> >>
>> > >> >>> Hey Telles,
>> > >> >>>
>> > >> >>> This is definitely a serde error. It sounds like your message
>>is
>> not
>> > >> >>> properly formatted as a JSON blob.
>> > >> >>>
>> > >> >>> If you are trying to just use a string as the message (vs. a
>>well
>> > >> >>> formatted JSON blob), then you should use the StringSerde.
>> > >> >>>
>> > >> >>> Cheers,
>> > >> >>> Chris
>> > >> >>>
>> > >> >>> On 8/7/14 8:05 AM, "Telles Nobrega" <[email protected]>
>> > wrote:
>> > >> >>>
>> > >> >>> >Hi, I'm running a simple samza topology that reads from  a
>>kafka
>> > >> topic
>> > >> >>> >that
>> > >> >>> >only has two Strings
>> > >> >>> >xx:xx:xx:xxxx;xx
>> > >> >>> >And its throwing an error
>> > >> >>> >
>> > >> >>> >Caught exception in process loop.
>> > >> >>> >org.codehaus.jackson.JsonParseException: Unexpected character
>> ('F'
>> > >> >>>(code
>> > >> >>> >70)): expected a valid value (number, String, array, object,
>> > 'true',
>> > >> >>> >'false' or 'null')
>> > >> >>> > at [Source: [B@56dfb465; line: 1, column: 2]
>> > >> >>> >at
>> > >>
>> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonPars
>>>>>>er
>> > >> >>>>Min
>> > >> >>> >imalBase.java:385)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar
>>>>>>(J
>> > >> >>>>son
>> > >> >>> >ParserMinimalBase.java:306)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf
>>>>>>8S
>> > >> >>>>tre
>> > >> >>> >amParser.java:1581)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8
>>>>>>St
>> > >> >>>>rea
>> > >> >>> >mParser.java:436)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser
>>>>>>.j
>> > >> >>>>ava
>> > >> >>> >:322)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.ja
>>>>>>va
>> > >> >>>>:24
>> > >> >>> >32)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.j
>>>>>>av
>> > >> >>>>a:2
>> > >> >>> >389)
>> > >> >>> >at
>> > >> >>>
>> > >>
>> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>> > >> >>> > at
>> > >>
>> >>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scal
>>>>>>a:
>> > >> >>>>115
>> > >> >>> >)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$sys
>>>>>>te
>> > >> >>>>m$S
>> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$sys
>>>>>>te
>> > >> >>>>m$S
>> > >> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242)
>> > >> >>> >at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > >> >>> > at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> > >> >>> >at
>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> > >> >>> > at 
>>scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> > >> >>> >at org.apache.samza.system.SystemConsumers.org
>> > >> >>>
>> > >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply
>>>>>>(S
>> > >> >>>>yst
>> > >> >>> >emConsumers.scala:180)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply
>>>>>>(S
>> > >> >>>>yst
>> > >> >>> >emConsumers.scala:180)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>>>>>.s
>> > >> >>>>cal
>> > >> >>> >a:244)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>>>>>.s
>> > >> >>>>cal
>> > >> >>> >a:244)
>> > >> >>> > at 
>>scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > >> >>> >at 
>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> > >> >>> > at
>> > scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>> > >> >>> >at
>> > >>
>> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> > >> >>> > at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.
>>>>>>sc
>> > >> >>>>ala
>> > >> >>> >:47)
>> > >> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93)
>> > >> >>> > at scala.collection.AbstractSet.map(Set.scala:47)
>> > >> >>> >at
>> > >> >>>
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers.
>>>>>>sc
>> > >> >>>>ala
>> > >> >>> >:180)
>> > >> >>> > at
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala
>>>>>>:4
>> > >> >>>>4)
>> > >> >>> >at
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:
>>>>>>20
>> > >> >>>>8)
>> > >> >>> > at 
>>org.apache.samza.container.RunLoop.process(RunLoop.scala:73)
>> > >> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57)
>> > >> >>> > at
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:50
>>>>>>4)
>> > >> >>> >at
>> > >> >>>
>> > >>
>> > >>
>> >
>> 
>>>>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:
>>>>>>81
>> > >> >>>>)
>> > >> >>> > at
>> > >>
>> >>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> > >> >>> >
>> > >> >>> >
>> > >> >>> >
>> > >> >>> >​Has anyone experienced this error before?​
>> > >> >>> >
>> > >> >>> >--
>> > >> >>> >------------------------------------------
>> > >> >>> >Telles Mota Vidal Nobrega
>> > >> >>> >M.sc. Candidate at UFCG
>> > >> >>> >B.sc. in Computer Science at UFCG
>> > >> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG
>> > >> >>>
>> > >> >>>
>> > >> >>
>> > >> >>
>> > >> >> --
>> > >> >> ------------------------------------------
>> > >> >> Telles Mota Vidal Nobrega
>> > >> >> M.sc. Candidate at UFCG
>> > >> >> B.sc. in Computer Science at UFCG
>> > >> >> Software Engineer at OpenStack Project - HP/LSD-UFCG
>> > >> >>
>> > >> >
>> > >> >
>> > >> >
>> > >> >--
>> > >> >------------------------------------------
>> > >> >Telles Mota Vidal Nobrega
>> > >> >M.sc. Candidate at UFCG
>> > >> >B.sc. in Computer Science at UFCG
>> > >> >Software Engineer at OpenStack Project - HP/LSD-UFCG
>> > >>
>> > >>
>> > >
>> > >
>> > > --
>> > > ------------------------------------------
>> > > Telles Mota Vidal Nobrega
>> > > M.sc. Candidate at UFCG
>> > > B.sc. in Computer Science at UFCG
>> > > Software Engineer at OpenStack Project - HP/LSD-UFCG
>> > >
>> >
>> >
>> >
>> > --
>> > ------------------------------------------
>> > Telles Mota Vidal Nobrega
>> > M.sc. Candidate at UFCG
>> > B.sc. in Computer Science at UFCG
>> > Software Engineer at OpenStack Project - HP/LSD-UFCG
>> >
>>
>
>
>
>-- 
>------------------------------------------
>Telles Mota Vidal Nobrega
>M.sc. Candidate at UFCG
>B.sc. in Computer Science at UFCG
>Software Engineer at OpenStack Project - HP/LSD-UFCG

Reply via email to