I changed my properties a little to look like this: link
<http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%[email protected]%3E>

here it goes:

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=consumptions

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=alarm.ConsumptionProducer
task.inputs=kafka.consumptions
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1

# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.metadata.broker.list=localhost:9092
systems.kafka.producer.producer.type=sync
# Normally, we'd set this much higher, but we want things to look snappy in
the demo.
systems.kafka.producer.batch.num.messages=1
ystems.kafka.streams.metrics.samza.msg.serde=metrics

But I'm getting this output,

0Caught exception in process loop.
java.lang.ClassCastException: java.util.HashMap cannot be cast to
java.lang.String
 at org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33)
at org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69)
 at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65)
at
org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstance.scala:170)
 at
org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstance.scala:170)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170)
 at
org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:116)
at
org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:116)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.container.RunLoop.send(RunLoop.scala:116)
 at org.apache.samza.container.RunLoop.run(RunLoop.scala:59)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
 at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)



On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega <[email protected]>
wrote:

> Hi Chris, I really appreciate the time you are taking to help me out.
>
> This is job.properties file
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=consumptions
>
> # YARN
>
> yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
>
> # Task
> task.class=alarm.ConsumptionProducer
> task.inputs=kafka.consumptions
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=kafka
> # Normally, this would be 3, but we have only one broker.
> task.checkpoint.replication.factor=1
>
> # Serializers
>
> serializers.registry.serde.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Kafka System
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> *systems.kafka.samza.msg.serde=json*
> systems.kafka.consumer.zookeeper.connect=localhost:2181/
> systems.kafka.consumer.auto.offset.reset=largest
> systems.kafka.producer.metadata.broker.list=localhost:9092
> systems.kafka.producer.producer.type=sync
> # Normally, we'd set this much higher, but we want things to look snappy
> in the demo.
> systems.kafka.producer.batch.num.messages=1
>
> *systems.kafka.streams.consumptions.key.serde=string*
> *systems.kafka.streams.consumptions.msg.serde=string*
>
> Does this look right?
> I'm running a local cluster, I want to have it running nicely before I can
> distribute it.
>
>
>
> On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini <
> [email protected]> wrote:
>
>> Hey Telles,
>>
>> Sure. In your job.properties file, define the serde:
>>
>> # Serializers
>>
>> serializers.registry.serde.class=org.apache.samza.serializers.StringSerdeFa
>> ctory
>>
>>
>> Then define the serde for your system:
>>
>> systems.kafka.samza.msg.serde=string
>>
>>
>> Cheers,
>> Chris
>>
>> On 8/7/14 10:54 AM, "Telles Nobrega" <[email protected]> wrote:
>>
>> >Can you give and example on how to use string serde, i'm getting an error
>> >when trying to set to string
>> >
>> >:53:26:804Got system producers: Set(kafka)
>> >17:53:26:809Got serdes: Set(string)
>> >17:53:29:206Container container_1407433587782_0001_01_000017 failed with
>> >exit code 1 - Exception from container-launch:
>> >
>> >
>> >
>> >On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega <[email protected]>
>> >wrote:
>> >
>> >> Thanks.
>> >>
>> >>
>> >> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini <
>> >> [email protected]> wrote:
>> >>
>> >>> Hey Telles,
>> >>>
>> >>> This is definitely a serde error. It sounds like your message is not
>> >>> properly formatted as a JSON blob.
>> >>>
>> >>> If you are trying to just use a string as the message (vs. a well
>> >>> formatted JSON blob), then you should use the StringSerde.
>> >>>
>> >>> Cheers,
>> >>> Chris
>> >>>
>> >>> On 8/7/14 8:05 AM, "Telles Nobrega" <[email protected]> wrote:
>> >>>
>> >>> >Hi, I'm running a simple samza topology that reads from  a kafka
>> topic
>> >>> >that
>> >>> >only has two Strings
>> >>> >xx:xx:xx:xxxx;xx
>> >>> >And its throwing an error
>> >>> >
>> >>> >Caught exception in process loop.
>> >>> >org.codehaus.jackson.JsonParseException: Unexpected character ('F'
>> >>>(code
>> >>> >70)): expected a valid value (number, String, array, object, 'true',
>> >>> >'false' or 'null')
>> >>> > at [Source: [B@56dfb465; line: 1, column: 2]
>> >>> >at
>> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParser
>> >>>>Min
>> >>> >imalBase.java:385)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(J
>> >>>>son
>> >>> >ParserMinimalBase.java:306)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8S
>> >>>>tre
>> >>> >amParser.java:1581)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8St
>> >>>>rea
>> >>> >mParser.java:436)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.j
>> >>>>ava
>> >>> >:322)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java
>> >>>>:24
>> >>> >32)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.jav
>> >>>>a:2
>> >>> >389)
>> >>> >at
>> >>>
>> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>> >>> > at
>> >>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:
>> >>>>115
>> >>> >)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$syste
>> >>>>m$S
>> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$syste
>> >>>>m$S
>> >>> >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242)
>> >>> >at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>> >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>> >at org.apache.samza.system.SystemConsumers.org
>> >>> >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(S
>> >>>>yst
>> >>> >emConsumers.scala:180)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(S
>> >>>>yst
>> >>> >emConsumers.scala:180)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.s
>> >>>>cal
>> >>> >a:244)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.s
>> >>>>cal
>> >>> >a:244)
>> >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>> >at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>> > at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>> >>> >at
>> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> >>> > at
>> >>>
>> >>>
>>
>> >>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.sc
>> >>>>ala
>> >>> >:47)
>> >>> >at scala.collection.SetLike$class.map(SetLike.scala:93)
>> >>> > at scala.collection.AbstractSet.map(Set.scala:47)
>> >>> >at
>> >>>
>> >>>
>>
>> >>>>org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers.sc
>> >>>>ala
>> >>> >:180)
>> >>> > at
>> >>>
>>
>> >>>>org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala:4
>> >>>>4)
>> >>> >at
>> >>>
>>
>> >>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:20
>> >>>>8)
>> >>> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:73)
>> >>> >at org.apache.samza.container.RunLoop.run(RunLoop.scala:57)
>> >>> > at
>> >>>
>>
>> >>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
>> >>> >at
>> >>>
>>
>> >>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81
>> >>>>)
>> >>> > at
>> >>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >>> >
>> >>> >
>> >>> >
>> >>> >​Has anyone experienced this error before?​
>> >>> >
>> >>> >--
>> >>> >------------------------------------------
>> >>> >Telles Mota Vidal Nobrega
>> >>> >M.sc. Candidate at UFCG
>> >>> >B.sc. in Computer Science at UFCG
>> >>> >Software Engineer at OpenStack Project - HP/LSD-UFCG
>> >>>
>> >>>
>> >>
>> >>
>> >> --
>> >> ------------------------------------------
>> >> Telles Mota Vidal Nobrega
>> >> M.sc. Candidate at UFCG
>> >> B.sc. in Computer Science at UFCG
>> >> Software Engineer at OpenStack Project - HP/LSD-UFCG
>> >>
>> >
>> >
>> >
>> >--
>> >------------------------------------------
>> >Telles Mota Vidal Nobrega
>> >M.sc. Candidate at UFCG
>> >B.sc. in Computer Science at UFCG
>> >Software Engineer at OpenStack Project - HP/LSD-UFCG
>>
>>
>
>
> --
> ------------------------------------------
> Telles Mota Vidal Nobrega
> M.sc. Candidate at UFCG
> B.sc. in Computer Science at UFCG
> Software Engineer at OpenStack Project - HP/LSD-UFCG
>



-- 
------------------------------------------
Telles Mota Vidal Nobrega
M.sc. Candidate at UFCG
B.sc. in Computer Science at UFCG
Software Engineer at OpenStack Project - HP/LSD-UFCG

Reply via email to