Hi Joe,
You remind me, maybe I included the incorrect serializer.
Here is how I created the producer:
And(s"a synchronous Kafka producer app that writes to the topic
$inputTopic")
val producerApp = {
val config = {
val c = new Properties
c.put("producer.type", "sync")
c.put("client.id", "kafka-spark-streaming-test-sync-producer")
c.put("request.required.acks", "1")
c.put("key.serializer.class", "kafka.serializer.StringEncoder")
c
}
kafkaZkCluster.createProducer(inputTopic.name, config).get
}
I have included the "key.serializer.class", but I'm not sure whether I did
correct..
The following is the error message:
14/11/20 17:08:11 INFO KafkaSparkStreamingSpec: Synchronously sending Tweet
{"username": "ANY_USER_1", "text": "ANY_TEXT_1", "timestamp": 1416532086} to
topic Some(testingInput)
[B@3eb78a85
KeyedMessage(testingInput,[B@3eb78a85,[B@3eb78a85,[B@3bf54b00)
msg.key: [B@3eb78a85
msg.message: [B@3bf54b00
msg.partKey: [B@3eb78a85
msg.topic: testingInput
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
at kafka.producer.Producer.send(Producer.scala:76)
at com.cisco.npa.kafka.KafkaProducerApp.send(KafkaProducerApp.scala:87)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:171)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:153)
at scala.collection.immutable.List.foreach(List.scala:318)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(KafkaSparkStreamingSpec.scala:153)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
at
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FeatureSpecLike$$anon$1.apply(FeatureSpecLike.scala:199)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FeatureSpec.withFixture(FeatureSpec.scala:1836)
at
org.scalatest.FeatureSpecLike$class.invokeWithFixture$1(FeatureSpecLike.scala:196)
at
org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
at
org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FeatureSpecLike$class.runTest(FeatureSpecLike.scala:208)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaSparkStreamingSpec.scala:35)
at
org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
at
com.cisco.npa.spark.KafkaSparkStreamingSpec.runTest(KafkaSparkStreamingSpec.scala:35)
at
org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
at
org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FeatureSpecLike$class.runTests(FeatureSpecLike.scala:267)
at org.scalatest.FeatureSpec.runTests(FeatureSpec.scala:1836)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at
org.scalatest.FeatureSpec.org$scalatest$FeatureSpecLike$$super$run(FeatureSpec.scala:1836)
at
org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
at
org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FeatureSpecLike$class.run(FeatureSpecLike.scala:309)
at org.scalatest.FeatureSpec.run(FeatureSpec.scala:1836)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
at
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
at
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
at
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
at
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
at
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
at org.scalatest.tools.Runner$.main(Runner.scala:860)
at org.scalatest.tools.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
scala.tools.eclipse.scalatest.launching.ScalaTestLauncher$.main(ScalaTestLauncher.scala:58)
at
scala.tools.eclipse.scalatest.launching.ScalaTestLauncher.main(ScalaTestLauncher.scala)
Thanks,
Haoming
> Date: Thu, 20 Nov 2014 20:29:44 -0500
> Subject: Re: Partition Key Cannot be Send Out by Producer
> From: [email protected]
> To: [email protected]
>
> iI would helpful to see the full stack trace. Also, how have you
> instantiated your Producer class? Did you set a value for
> "serializer.class" in the property?
>
> /*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
> On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang <[email protected]>
> wrote:
>
> > Hi Harsha,
> >
> > I just tried to hard code a string message, then convert the message to
> > byte array, but no lucky...
> >
> > The following is how my program works:
> >
> > Create a hardcode key, which is String, then convert to byte array,
> > iterate a network message, send the message one by one:
> > networkelements foreach {
> > case networkelement =>
> > val bytes = Injection(networkelement)
> > logger.info(s"Synchronously sending Tweet $networkelement to
> > topic ${producerApp.defaultTopic}")
> >
> > val hardKey = "2"
> > val parkey = hardKey.getBytes("UTF8")
> > val topic = producerApp.defaultTopic
> > producerApp.send(parkey, bytes, topic)
> > }
> >
> >
> > Here is how the networkelements created, where NetworkElement is a class
> > that created by avro, I think you can ignore it:
> > val networkelements = fixture.messages
> >
> > val fixture = {
> > val BeginningOfEpoch = 0.seconds
> > val AnyTimestamp = 1234.seconds
> > val now = System.currentTimeMillis().millis
> >
> > new {
> > val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1",
> > now.toSeconds)
> > val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2",
> > BeginningOfEpoch.toSeconds)
> > val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3",
> > AnyTimestamp.toSeconds)
> >
> > val messages = Seq(t1, t2, t3)
> > }
> > }
> >
> > BTW, I defined the Key and Val types as following:
> >
> > type Key = Array[Byte]
> > type Val = Array[Byte]
> >
> > Thanks,
> > Haoming
> >
> > > From: [email protected]
> > > To: [email protected]
> > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > Date: Thu, 20 Nov 2014 16:59:19 -0800
> > >
> > > also the (key: Key, value: Val, topic: Option[String]) "value" should be
> > > a string converted to a byte array.
> > > Can you send a example of your key and value data.
> > >
> > >
> > > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > > > Hi Harsha,
> > > >
> > > > Thanks for suggestion!
> > > >
> > > > I have checked this link before, and I tried to create the partition
> > key
> > > > like the following:
> > > > val hardKey = "2"
> > > > val parkey = hardKey.getBytes("UTF8")
> > > >
> > > > But I still get the same exception. I also tried set "UTF8" as "UTF-8",
> > > > but no luck...
> > > >
> > > > Regards,
> > > > Haoming
> > > >
> > > > > From: [email protected]
> > > > > To: [email protected]
> > > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > > >
> > > > > Hi Haoming,
> > > > > Take a look at the code here
> > > > >
> > https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > > > for your partKey it should be string and when you converting it into
> > > > > byte array you can use partKey.getBytes("UTF8")
> > > > > -Harsha
> > > > >
> > > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > > > > > KeyedMessage by producer. I would like to design a partition
> > > > > > function to route the message based on the key, but the producer
> > cannot
> > > > > > send the KeyedMessage and I got this exception:
> > > > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > > > >
> > > > > > What I tried is hardcode a partition key ( I tried String and
> > Integer,
> > > > > > currently it is Integer ), then convert the partition key to Byte
> > Array:
> > > > > > val converter = new DataTypeConvertion
> > > > > > val hardKey = 2
> > > > > > val partkey = converter.intToByteArray(hardKey)
> > > > > >
> > > > > > Then create a KeyedMessage by the following function:
> > > > > >
> > > > > > private def toMessage(value: Val, key: Option[Key] = None, topic:
> > > > > > Option[String] = None): KeyedMessage[Key, Val] = {
> > > > > > val t = topic.get
> > > > > > require(!t.isEmpty, "Topic must not be empty")
> > > > > > key match {
> > > > > > case Some(key) => new KeyedMessage(t, key, value)
> > > > > > case _ => new KeyedMessage(t, value)
> > > > > > }
> > > > > > }
> > > > > >
> > > > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > > >
> > > > > > def send(key: Key, value: Val, topic: Option[String] = None) {
> > > > > > val msg = toMessage(value, Option(key), topic)
> > > > > > print(msg + "\n")
> > > > > > print("msg.key" + msg.key + "\n")
> > > > > > print("msg.message" + msg.message + "\n")
> > > > > > print("msg.partKey" + msg.partKey + "\n")
> > > > > > print("msg.topic" + msg.topic + "\n")
> > > > > > try {
> > > > > > p.send(msg) //P is an instance of producer, exception
> > happens in
> > > > > > this line
> > > > > > } catch {
> > > > > > case e: Exception =>
> > > > > > e.printStackTrace()
> > > > > > System.exit(1)
> > > > > > }
> > > > > > }
> > > > > >
> > > > > > As you can see, I added many print statement in the above
> > function, and
> > > > > > the following is the output of above function:
> > > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > > > msg.key: [B@7ad40950
> > > > > > msg.message: [B@7ce18764
> > > > > > msg.partKey: [B@7ad40950
> > > > > > msg.topic: testingInput
> > > > > >
> > > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it
> > is a
> > > > > > memory address and the exception (java.lang.ClassCastException: [B
> > cannot
> > > > > > be cast to java.lang.String) happens when "send" function try to
> > convert
> > > > > > the Byte Array to String.
> > > > > >
> > > > > > Am I wrong on creating a key in Byte Array type?
> > > > > > Some examples of how to use KeyedMessage will be great!
> > > > > >
> > > > > > Regards,
> > > > > > Haoming
> > > > > >
> > > > > >
> > > >
> >
> >