It worked.
On 07 Aug 2014, at 16:20, Chris Riccomini <[email protected]>
wrote:
> Hey Telles,
>
> Yea, as Yan suggested, you're sending a map back to the Kafka system:
>
>
> Map<String, Object> outgoingMap = Event.toMap(event);
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
>
>
> Samza is going to take the object (outgoingMap) and try to encode it using
> StringSerde. It won't be able to do this, since the object you've given it
> is a map, not a string.
>
> The fix for this is to configure the default serde as StringSerdeFactory,
> which you've done, and to configure the output stream ("values") using the
> JsonSerdeFactory. This can be done with:
>
> systems.kafka.streams.values.samza.msg.serde=json
>
>
> And then defining the json serde, if you haven't already done so:
>
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFacto
> ry
>
>
> Cheers,
> Chris
>
>
>
> On 8/7/14 12:12 PM, "Telles Nobrega" <[email protected]> wrote:
>
>> Here it goes: http://pastebin.com/hhswEnyW
>>
>>
>> On Thu, Aug 7, 2014 at 4:10 PM, Chris Riccomini <
>> [email protected]> wrote:
>>
>>> Hey Telles,
>>>
>>> Can you paste the code for your StreamTask? If it's still showing the
>>> same
>>> message, then it sounds like you're still trying to send a HashMap
>>> using a
>>> StringSerde.
>>>
>>> Cheers,
>>> Chris
>>>
>>> On 8/7/14 12:05 PM, "Telles Nobrega" <[email protected]> wrote:
>>>
>>>> Still showing the same message
>>>>
>>>>
>>>> On Thu, Aug 7, 2014 at 3:56 PM, Chris Riccomini <
>>>> [email protected]> wrote:
>>>>
>>>>> Hey Telles,
>>>>>
>>>>> The code you've posted in Produce.java shows:
>>>>>
>>>>> KeyedMessage<String, String> data = new
>>> KeyedMessage<String,
>>>>> String>("consumptions", String.valueOf(key),String.valueOf(value));
>>>>>
>>>>>
>>>>> Which suggests that you are sending a string for both key and value.
>>> If
>>>>> you have a Samza task consuming from this topic, you should set:
>>>>>
>>>>> systems.system-name.samza.key.serde=string
>>>>>
>>>>> systems.system-name.samza.msg.serde=string
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Chris
>>>>>
>>>>> On 8/7/14 11:52 AM, "Telles Nobrega" <[email protected]> wrote:
>>>>>
>>>>>> Hum, that sounds like a perfect reason for it.
>>>>>>
>>>>>> I'm writing to the topic with this code
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>> https://github.com/tellesnobrega/kafka-producer/blob/master/src/Produce.j
>>>>> a
>>>>>> va
>>>>>>
>>>>>> My problem is that I need to send numbers as key and value to the
>>> kafka
>>>>>> topic so i can read it in samza.
>>>>>>
>>>>>> What is the best way to de/serialize this?
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 7, 2014 at 3:43 PM, Yan Fang <[email protected]>
>>> wrote:
>>>>>>
>>>>>>> Hi Telles,
>>>>>>>
>>>>>>> One of the possible reasons is that, in your process method, you
>>> are
>>>>>>> trying
>>>>>>> to send a HashMap, not a String, in the collection.send. Could you
>>>>>>> check it
>>>>>>> ?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Fang, Yan
>>>>>>> [email protected]
>>>>>>> +1 (206) 849-4108
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 7, 2014 at 11:25 AM, Telles Nobrega
>>>>>>> <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I changed my properties a little to look like this: link
>>>>>>>> <
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>> http://mail-archives.apache.org/mod_mbox/samza-dev/201311.mbox/%3CEA1B8C3
>>>>>>> [email protected]%3E
>>>>>>>>>
>>>>>>>>
>>>>>>>> here it goes:
>>>>>>>>
>>>>>>>> # Job
>>>>>>>> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>>>>>>>> job.name=consumptions
>>>>>>>>
>>>>>>>> # YARN
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> yarn.package.path=file://${basedir}/target/${project.artifactId}-${
>>>>> pom.ve
>>>>>>> rsion}-dist.tar.gz
>>>>>>>>
>>>>>>>> # Task
>>>>>>>> task.class=alarm.ConsumptionProducer
>>>>>>>> task.inputs=kafka.consumptions
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckp
>>>>>>> oi
>>>>>>> nt
>>>>>>> ManagerFactory
>>>>>>>> task.checkpoint.system=kafka
>>>>>>>> # Normally, this would be 3, but we have only one broker.
>>>>>>>> task.checkpoint.replication.factor=1
>>>>>>>>
>>>>>>>> # Metrics
>>>>>>>> metrics.reporters=snapshot,jmx
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
>>>>>>> ri
>>>>>>> cs
>>>>>>> SnapshotReporterFactory
>>>>>>>> metrics.reporter.snapshot.stream=kafka.metrics
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
>>>>>>> te
>>>>>>> rF
>>>>>>> actory
>>>>>>>>
>>>>>>>> # Serializers
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> serializers.registry.string.class=org.apache.samza.serializers.String
>>>>>>> Se
>>>>>>> rd
>>>>>>> eFactory
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> serializers.registry.metrics.class=org.apache.samza.serializers.Metri
>>>>>>> cs
>>>>>>> Sn
>>>>>>> apshotSerdeFactory
>>>>>>>>
>>>>>>>> # Kafka System
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystem
>>>>>>> Fa
>>>>>>> ct
>>>>>>> ory
>>>>>>>> systems.kafka.samza.msg.serde=string
>>>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>>>>>>> systems.kafka.consumer.auto.offset.reset=largest
>>>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>>>>> systems.kafka.producer.producer.type=sync
>>>>>>>> # Normally, we'd set this much higher, but we want things to
>>> look
>>>>>>> snappy
>>>>>>> in
>>>>>>>> the demo.
>>>>>>>> systems.kafka.producer.batch.num.messages=1
>>>>>>>> ystems.kafka.streams.metrics.samza.msg.serde=metrics
>>>>>>>>
>>>>>>>> But I'm getting this output,
>>>>>>>>
>>>>>>>> 0Caught exception in process loop.
>>>>>>>> java.lang.ClassCastException: java.util.HashMap cannot be cast
>>> to
>>>>>>>> java.lang.String
>>>>>>>> at
>>>>>>>
>>>>> org.apache.samza.serializers.StringSerde.toBytes(StringSerde.scala:33)
>>>>>>>> at
>>>>>>>
>>>
>>>>> org.apache.samza.serializers.SerdeManager.toBytes(SerdeManager.scala:69
>>>>> )
>>>>>>>> at
>>>>>>>
>>>>> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskIns
>>>>>>> ta
>>>>>>> nc
>>>>>>> e.scala:170)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskIns
>>>>>>> ta
>>>>>>> nc
>>>>>>> e.scala:170)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.
>>>>>>> sc
>>>>>>> al
>>>>>>> a:59)
>>>>>>>> at
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>> at
>>>>>
>>>>> org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scal
>>>>>>> a:
>>>>>>> 11
>>>>>>> 6)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scal
>>>>>>> a:
>>>>>>> 11
>>>>>>> 6)
>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>> at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>> at
>>>>>>>>
>>>>>
>>>
>>>>>>> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:
>>>>>>> 20
>>>>>>> 6)
>>>>>>>> at org.apache.samza.container.RunLoop.send(RunLoop.scala:116)
>>>>>>>> at org.apache.samza.container.RunLoop.run(RunLoop.scala:59)
>>>>>>>> at
>>>>>>>
>>>
>>>>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
>>>>>>>> at
>>>>>>>>
>>>>>
>>>
>>>>>>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:
>>>>>>> 81
>>>>>>> )
>>>>>>>> at
>>>>>
>>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 7, 2014 at 3:12 PM, Telles Nobrega
>>>>>>> <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Chris, I really appreciate the time you are taking to help
>>> me
>>>>>>> out.
>>>>>>>>>
>>>>>>>>> This is job.properties file
>>>>>>>>>
>>>>>>>>> # Job
>>>>>>>>> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>>>>>>>>> job.name=consumptions
>>>>>>>>>
>>>>>>>>> # YARN
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> yarn.package.path=file://${basedir}/target/${project.artifactId}-${
>>>>> pom.ve
>>>>>>> rsion}-dist.tar.gz
>>>>>>>>>
>>>>>>>>> # Task
>>>>>>>>> task.class=alarm.ConsumptionProducer
>>>>>>>>> task.inputs=kafka.consumptions
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckp
>>>>>>> oi
>>>>>>> nt
>>>>>>> ManagerFactory
>>>>>>>>> task.checkpoint.system=kafka
>>>>>>>>> # Normally, this would be 3, but we have only one broker.
>>>>>>>>> task.checkpoint.replication.factor=1
>>>>>>>>>
>>>>>>>>> # Serializers
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> serializers.registry.serde.class=org.apache.samza.serializers.StringS
>>>>>>> er
>>>>>>> de
>>>>>>> Factory
>>>>>>>>>
>>>>>>>>> # Kafka System
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystem
>>>>>>> Fa
>>>>>>> ct
>>>>>>> ory
>>>>>>>>> *systems.kafka.samza.msg.serde=json*
>>>>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>>>>>>>> systems.kafka.consumer.auto.offset.reset=largest
>>>>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>>>>>> systems.kafka.producer.producer.type=sync
>>>>>>>>> # Normally, we'd set this much higher, but we want things to
>>> look
>>>>>>> snappy
>>>>>>>>> in the demo.
>>>>>>>>> systems.kafka.producer.batch.num.messages=1
>>>>>>>>>
>>>>>>>>> *systems.kafka.streams.consumptions.key.serde=string*
>>>>>>>>> *systems.kafka.streams.consumptions.msg.serde=string*
>>>>>>>>>
>>>>>>>>> Does this look right?
>>>>>>>>> I'm running a local cluster, I want to have it running nicely
>>>>>>> before I
>>>>>>>> can
>>>>>>>>> distribute it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 7, 2014 at 3:08 PM, Chris Riccomini <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Telles,
>>>>>>>>>>
>>>>>>>>>> Sure. In your job.properties file, define the serde:
>>>>>>>>>>
>>>>>>>>>> # Serializers
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>> serializers.registry.serde.class=org.apache.samza.serializers.StringS
>>>>>>> er
>>>>>>> de
>>>>>>> Fa
>>>>>>>>>> ctory
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Then define the serde for your system:
>>>>>>>>>>
>>>>>>>>>> systems.kafka.samza.msg.serde=string
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Chris
>>>>>>>>>>
>>>>>>>>>> On 8/7/14 10:54 AM, "Telles Nobrega"
>>> <[email protected]>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you give and example on how to use string serde, i'm
>>>>> getting
>>>>>>> an
>>>>>>>> error
>>>>>>>>>>> when trying to set to string
>>>>>>>>>>>
>>>>>>>>>>> :53:26:804Got system producers: Set(kafka)
>>>>>>>>>>> 17:53:26:809Got serdes: Set(string)
>>>>>>>>>>> 17:53:29:206Container container_1407433587782_0001_01_000017
>>>>>>> failed
>>>>>>>> with
>>>>>>>>>>> exit code 1 - Exception from container-launch:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Aug 7, 2014 at 2:41 PM, Telles Nobrega <
>>>>>>>> [email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Aug 7, 2014 at 1:54 PM, Chris Riccomini <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Telles,
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is definitely a serde error. It sounds like your
>>>>> message
>>>>>>> is
>>>>>>> not
>>>>>>>>>>>>> properly formatted as a JSON blob.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you are trying to just use a string as the message
>>> (vs. a
>>>>>>> well
>>>>>>>>>>>>> formatted JSON blob), then you should use the
>>> StringSerde.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 8/7/14 8:05 AM, "Telles Nobrega"
>>>>> <[email protected]>
>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, I'm running a simple samza topology that reads from
>>> a
>>>>>>> kafka
>>>>>>>>>> topic
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>> only has two Strings
>>>>>>>>>>>>>> xx:xx:xx:xxxx;xx
>>>>>>>>>>>>>> And its throwing an error
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caught exception in process loop.
>>>>>>>>>>>>>> org.codehaus.jackson.JsonParseException: Unexpected
>>>>> character
>>>>>>> ('F'
>>>>>>>>>>>>> (code
>>>>>>>>>>>>>> 70)): expected a valid value (number, String, array,
>>>>> object,
>>>>>>>> 'true',
>>>>>>>>>>>>>> 'false' or 'null')
>>>>>>>>>>>>>> at [Source: [B@56dfb465; line: 1, column: 2]
>>>>>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>
>>>
>>>>>>>> org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291
>>>>>>>> )
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(Json
>>>>>>>>>>> Pa
>>>>>>>>>>> rs
>>>>>>>>>>> er
>>>>>>>>>>>>>> Min
>>>>>>>>>>>>>> imalBase.java:385)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpected
>>>>>>>>>>> Ch
>>>>>>>>>>> ar
>>>>>>>>>>> (J
>>>>>>>>>>>>>> son
>>>>>>>>>>>>>> ParserMinimalBase.java:306)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue
>>>>>>>>>>> (U
>>>>>>>>>>> tf
>>>>>>>>>>> 8S
>>>>>>>>>>>>>> tre
>>>>>>>>>>>>>> amParser.java:1581)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(
>>>>>>>>>>> Ut
>>>>>>>>>>> f8
>>>>>>>>>>> St
>>>>>>>>>>>>>> rea
>>>>>>>>>>>>>> mParser.java:436)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamPa
>>>>>>>>>>> rs
>>>>>>>>>>> er
>>>>>>>>>>> .j
>>>>>>>>>>>>>> ava
>>>>>>>>>>>>>> :322)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMappe
>>>>>>>>>>> r.
>>>>>>>>>>> ja
>>>>>>>>>>> va
>>>>>>>>>>>>>> :24
>>>>>>>>>>>>>> 32)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapp
>>>>>>>>>>> er
>>>>>>>>>>> .j
>>>>>>>>>>> av
>>>>>>>>>>>>>> a:2
>>>>>>>>>>>>>> 389)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>
>>>>> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>>>>>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>
>>>
>>>>>>>> org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.
>>>>>>>>>>> sc
>>>>>>>>>>> al
>>>>>>>>>>> a:
>>>>>>>>>>>>>> 115
>>>>>>>>>>>>>> )
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza
>>>>>>>>>>> $s
>>>>>>>>>>> ys
>>>>>>>>>>> te
>>>>>>>>>>>>>> m$S
>>>>>>>>>>>>>> ystemConsumers$$poll$5.apply(SystemConsumers.scala:245)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza
>>>>>>>>>>> $s
>>>>>>>>>>> ys
>>>>>>>>>>> te
>>>>>>>>>>>>>> m$S
>>>>>>>>>>>>>> ystemConsumers$$poll$5.apply(SystemConsumers.scala:242)
>>>>>>>>>>>>>> at
>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>> at
>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>> at
>>>>>>>>
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>>>>>> at
>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>>>>> at org.apache.samza.system.SystemConsumers.org
>>>>>>>>>>>>>
>>>>>>>>
>>>>>> $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.a
>>>>>>>>>>> pp
>>>>>>>>>>> ly
>>>>>>>>>>> (S
>>>>>>>>>>>>>> yst
>>>>>>>>>>>>>> emConsumers.scala:180)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.a
>>>>>>>>>>> pp
>>>>>>>>>>> ly
>>>>>>>>>>> (S
>>>>>>>>>>>>>> yst
>>>>>>>>>>>>>> emConsumers.scala:180)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
>>>>>>>>>>> Li
>>>>>>>>>>> ke
>>>>>>>>>>> .s
>>>>>>>>>>>>>> cal
>>>>>>>>>>>>>> a:244)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
>>>>>>>>>>> Li
>>>>>>>>>>> ke
>>>>>>>>>>> .s
>>>>>>>>>>>>>> cal
>>>>>>>>>>>>>> a:244)
>>>>>>>>>>>>>> at
>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>> at
>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>> at
>>>>>>>>
>>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>>>>>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>
>>>
>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244
>>>>>>>> )
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(
>>>>>>>>>>> Se
>>>>>>>>>>> t.
>>>>>>>>>>> sc
>>>>>>>>>>>>>> ala
>>>>>>>>>>>>>> :47)
>>>>>>>>>>>>>> at scala.collection.SetLike$class.map(SetLike.scala:93)
>>>>>>>>>>>>>> at scala.collection.AbstractSet.map(Set.scala:47)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsum
>>>>>>>>>>> er
>>>>>>>>>>> s.
>>>>>>>>>>> sc
>>>>>>>>>>>>>> ala
>>>>>>>>>>>>>> :180)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.s
>>>>>>>>>>> ca
>>>>>>>>>>> la
>>>>>>>>>>> :4
>>>>>>>>>>>>>> 4)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.system.SystemConsumers.choose(SystemConsumers.sc
>>>>>>>>>>> al
>>>>>>>>>>> a:
>>>>>>>>>>> 20
>>>>>>>>>>>>>> 8)
>>>>>>>>>>>>>> at
>>>>>>> org.apache.samza.container.RunLoop.process(RunLoop.scala:73)
>>>>>>>>>>>>>> at
>>> org.apache.samza.container.RunLoop.run(RunLoop.scala:57)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scal
>>>>>>>>>>> a:
>>>>>>>>>>> 50
>>>>>>>>>>> 4)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>>>>>>>>>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.sc
>>>>>>>>>>> al
>>>>>>>>>>> a:
>>>>>>>>>>> 81
>>>>>>>>>>>>>> )
>>>>>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>
>>>
>>>>>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Has anyone experienced this error before?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> ------------------------------------------
>>>>>>>>>>>>>> Telles Mota Vidal Nobrega
>>>>>>>>>>>>>> M.sc. Candidate at UFCG
>>>>>>>>>>>>>> B.sc. in Computer Science at UFCG
>>>>>>>>>>>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> ------------------------------------------
>>>>>>>>>>>> Telles Mota Vidal Nobrega
>>>>>>>>>>>> M.sc. Candidate at UFCG
>>>>>>>>>>>> B.sc. in Computer Science at UFCG
>>>>>>>>>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> ------------------------------------------
>>>>>>>>>>> Telles Mota Vidal Nobrega
>>>>>>>>>>> M.sc. Candidate at UFCG
>>>>>>>>>>> B.sc. in Computer Science at UFCG
>>>>>>>>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> ------------------------------------------
>>>>>>>>> Telles Mota Vidal Nobrega
>>>>>>>>> M.sc. Candidate at UFCG
>>>>>>>>> B.sc. in Computer Science at UFCG
>>>>>>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> ------------------------------------------
>>>>>>>> Telles Mota Vidal Nobrega
>>>>>>>> M.sc. Candidate at UFCG
>>>>>>>> B.sc. in Computer Science at UFCG
>>>>>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> ------------------------------------------
>>>>>> Telles Mota Vidal Nobrega
>>>>>> M.sc. Candidate at UFCG
>>>>>> B.sc. in Computer Science at UFCG
>>>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ------------------------------------------
>>>> Telles Mota Vidal Nobrega
>>>> M.sc. Candidate at UFCG
>>>> B.sc. in Computer Science at UFCG
>>>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>>>
>>>
>>
>>
>> --
>> ------------------------------------------
>> Telles Mota Vidal Nobrega
>> M.sc. Candidate at UFCG
>> B.sc. in Computer Science at UFCG
>> Software Engineer at OpenStack Project - HP/LSD-UFCG
>