Hi Joe, I just tried, it works! Now I should to think how to design the partition function.
Thanks! Haoming > Date: Thu, 20 Nov 2014 20:44:29 -0500 > Subject: Re: Partition Key Cannot be Send Out by Producer > From: joe.st...@stealth.ly > To: users@kafka.apache.org > > Yes, that was what I was thinking, you don't need to set the serializer > class if you want Array[byte] that is the default. Remove the line > c.put("key.serializer.class", > "kafka.serializer.StringEncoder") you should either see it work or have to > work through the next issue, hopefully the former =8^) > > /******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > ********************************************/ > > On Thu, Nov 20, 2014 at 8:42 PM, Haoming Zhang <haoming.zh...@outlook.com> > wrote: > > > Hi Joe, > > > > You remind me, maybe I included the incorrect serializer. > > > > Here is how I created the producer: > > And(s"a synchronous Kafka producer app that writes to the topic > > $inputTopic") > > val producerApp = { > > val config = { > > val c = new Properties > > c.put("producer.type", "sync") > > c.put("client.id", "kafka-spark-streaming-test-sync-producer") > > c.put("request.required.acks", "1") > > c.put("key.serializer.class", "kafka.serializer.StringEncoder") > > c > > } > > kafkaZkCluster.createProducer(inputTopic.name, config).get > > } > > > > I have included the "key.serializer.class", but I'm not sure whether I did > > correct.. > > > > The following is the error message: > > > > 14/11/20 17:08:11 INFO KafkaSparkStreamingSpec: Synchronously sending > > Tweet {"username": "ANY_USER_1", "text": "ANY_TEXT_1", "timestamp": > > 1416532086} to topic Some(testingInput) > > [B@3eb78a85 > > KeyedMessage(testingInput,[B@3eb78a85,[B@3eb78a85,[B@3bf54b00) > > msg.key: [B@3eb78a85 > > msg.message: [B@3bf54b00 > > msg.partKey: [B@3eb78a85 > > msg.topic: testingInput > > java.lang.ClassCastException: [B cannot be cast to java.lang.String > > at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46) > > at > > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128) > > at > > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > > at > > kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125) > > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52) > > at kafka.producer.Producer.send(Producer.scala:76) > > at com.cisco.npa.kafka.KafkaProducerApp.send(KafkaProducerApp.scala:87) > > at > > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:171) > > at > > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:153) > > at scala.collection.immutable.List.foreach(List.scala:318) > > at > > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(KafkaSparkStreamingSpec.scala:153) > > at > > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117) > > at > > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117) > > at > > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) > > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > > at org.scalatest.Transformer.apply(Transformer.scala:22) > > at org.scalatest.Transformer.apply(Transformer.scala:20) > > at > > org.scalatest.FeatureSpecLike$$anon$1.apply(FeatureSpecLike.scala:199) > > at org.scalatest.Suite$class.withFixture(Suite.scala:1122) > > at org.scalatest.FeatureSpec.withFixture(FeatureSpec.scala:1836) > > at > > org.scalatest.FeatureSpecLike$class.invokeWithFixture$1(FeatureSpecLike.scala:196) > > at > > org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208) > > at > > org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208) > > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) > > at > > org.scalatest.FeatureSpecLike$class.runTest(FeatureSpecLike.scala:208) > > at com.cisco.npa.spark.KafkaSparkStreamingSpec.org > > $scalatest$BeforeAndAfterEach$$super$runTest(KafkaSparkStreamingSpec.scala:35) > > at > > org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255) > > at > > com.cisco.npa.spark.KafkaSparkStreamingSpec.runTest(KafkaSparkStreamingSpec.scala:35) > > at > > org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267) > > at > > org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267) > > at > > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) > > at > > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) > > at scala.collection.immutable.List.foreach(List.scala:318) > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) > > at org.scalatest.SuperEngine.org > > $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) > > at > > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) > > at > > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) > > at scala.collection.immutable.List.foreach(List.scala:318) > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) > > at org.scalatest.SuperEngine.org > > $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) > > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) > > at > > org.scalatest.FeatureSpecLike$class.runTests(FeatureSpecLike.scala:267) > > at org.scalatest.FeatureSpec.runTests(FeatureSpec.scala:1836) > > at org.scalatest.Suite$class.run(Suite.scala:1424) > > at org.scalatest.FeatureSpec.org > > $scalatest$FeatureSpecLike$$super$run(FeatureSpec.scala:1836) > > at > > org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309) > > at > > org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309) > > at org.scalatest.SuperEngine.runImpl(Engine.scala:545) > > at org.scalatest.FeatureSpecLike$class.run(FeatureSpecLike.scala:309) > > at org.scalatest.FeatureSpec.run(FeatureSpec.scala:1836) > > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) > > at > > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) > > at > > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) > > at scala.collection.immutable.List.foreach(List.scala:318) > > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) > > at > > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) > > at > > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) > > at > > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) > > at > > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) > > at org.scalatest.tools.Runner$.main(Runner.scala:860) > > at org.scalatest.tools.Runner.main(Runner.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > > scala.tools.eclipse.scalatest.launching.ScalaTestLauncher$.main(ScalaTestLauncher.scala:58) > > at > > scala.tools.eclipse.scalatest.launching.ScalaTestLauncher.main(ScalaTestLauncher.scala) > > > > Thanks, > > Haoming > > > > > Date: Thu, 20 Nov 2014 20:29:44 -0500 > > > Subject: Re: Partition Key Cannot be Send Out by Producer > > > From: joe.st...@stealth.ly > > > To: users@kafka.apache.org > > > > > > iI would helpful to see the full stack trace. Also, how have you > > > instantiated your Producer class? Did you set a value for > > > "serializer.class" in the property? > > > > > > /******************************************* > > > Joe Stein > > > Founder, Principal Consultant > > > Big Data Open Source Security LLC > > > http://www.stealth.ly > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > ********************************************/ > > > > > > On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang < > > haoming.zh...@outlook.com> > > > wrote: > > > > > > > Hi Harsha, > > > > > > > > I just tried to hard code a string message, then convert the message to > > > > byte array, but no lucky... > > > > > > > > The following is how my program works: > > > > > > > > Create a hardcode key, which is String, then convert to byte array, > > > > iterate a network message, send the message one by one: > > > > networkelements foreach { > > > > case networkelement => > > > > val bytes = Injection(networkelement) > > > > logger.info(s"Synchronously sending Tweet $networkelement to > > > > topic ${producerApp.defaultTopic}") > > > > > > > > val hardKey = "2" > > > > val parkey = hardKey.getBytes("UTF8") > > > > val topic = producerApp.defaultTopic > > > > producerApp.send(parkey, bytes, topic) > > > > } > > > > > > > > > > > > Here is how the networkelements created, where NetworkElement is a > > class > > > > that created by avro, I think you can ignore it: > > > > val networkelements = fixture.messages > > > > > > > > val fixture = { > > > > val BeginningOfEpoch = 0.seconds > > > > val AnyTimestamp = 1234.seconds > > > > val now = System.currentTimeMillis().millis > > > > > > > > new { > > > > val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1", > > > > now.toSeconds) > > > > val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2", > > > > BeginningOfEpoch.toSeconds) > > > > val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3", > > > > AnyTimestamp.toSeconds) > > > > > > > > val messages = Seq(t1, t2, t3) > > > > } > > > > } > > > > > > > > BTW, I defined the Key and Val types as following: > > > > > > > > type Key = Array[Byte] > > > > type Val = Array[Byte] > > > > > > > > Thanks, > > > > Haoming > > > > > > > > > From: ka...@harsha.io > > > > > To: users@kafka.apache.org > > > > > Subject: Re: Partition Key Cannot be Send Out by Producer > > > > > Date: Thu, 20 Nov 2014 16:59:19 -0800 > > > > > > > > > > also the (key: Key, value: Val, topic: Option[String]) "value" > > should be > > > > > a string converted to a byte array. > > > > > Can you send a example of your key and value data. > > > > > > > > > > > > > > > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote: > > > > > > Hi Harsha, > > > > > > > > > > > > Thanks for suggestion! > > > > > > > > > > > > I have checked this link before, and I tried to create the > > partition > > > > key > > > > > > like the following: > > > > > > val hardKey = "2" > > > > > > val parkey = hardKey.getBytes("UTF8") > > > > > > > > > > > > But I still get the same exception. I also tried set "UTF8" as > > "UTF-8", > > > > > > but no luck... > > > > > > > > > > > > Regards, > > > > > > Haoming > > > > > > > > > > > > > From: ka...@harsha.io > > > > > > > To: users@kafka.apache.org > > > > > > > Subject: Re: Partition Key Cannot be Send Out by Producer > > > > > > > Date: Thu, 20 Nov 2014 16:43:11 -0800 > > > > > > > > > > > > > > Hi Haoming, > > > > > > > Take a look at the code here > > > > > > > > > > > > > https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala > > > > > > > for your partKey it should be string and when you converting it > > into > > > > > > > byte array you can use partKey.getBytes("UTF8") > > > > > > > -Harsha > > > > > > > > > > > > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote: > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I'm a beginner of Kafka, currently I'm stuck by how to send > > out a > > > > > > > > KeyedMessage by producer. I would like to design a partition > > > > > > > > function to route the message based on the key, but the > > producer > > > > cannot > > > > > > > > send the KeyedMessage and I got this exception: > > > > > > > > java.lang.ClassCastException: [B cannot be cast to > > java.lang.String > > > > > > > > > > > > > > > > What I tried is hardcode a partition key ( I tried String and > > > > Integer, > > > > > > > > currently it is Integer ), then convert the partition key to > > Byte > > > > Array: > > > > > > > > val converter = new DataTypeConvertion > > > > > > > > val hardKey = 2 > > > > > > > > val partkey = converter.intToByteArray(hardKey) > > > > > > > > > > > > > > > > Then create a KeyedMessage by the following function: > > > > > > > > > > > > > > > > private def toMessage(value: Val, key: Option[Key] = None, > > topic: > > > > > > > > Option[String] = None): KeyedMessage[Key, Val] = { > > > > > > > > val t = topic.get > > > > > > > > require(!t.isEmpty, "Topic must not be empty") > > > > > > > > key match { > > > > > > > > case Some(key) => new KeyedMessage(t, key, value) > > > > > > > > case _ => new KeyedMessage(t, value) > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > Then try to send the KeyedMessage by a Kafka producer: > > > > > > > > > > > > > > > > def send(key: Key, value: Val, topic: Option[String] = None) > > { > > > > > > > > val msg = toMessage(value, Option(key), topic) > > > > > > > > print(msg + "\n") > > > > > > > > print("msg.key" + msg.key + "\n") > > > > > > > > print("msg.message" + msg.message + "\n") > > > > > > > > print("msg.partKey" + msg.partKey + "\n") > > > > > > > > print("msg.topic" + msg.topic + "\n") > > > > > > > > try { > > > > > > > > p.send(msg) //P is an instance of producer, exception > > > > happens in > > > > > > > > this line > > > > > > > > } catch { > > > > > > > > case e: Exception => > > > > > > > > e.printStackTrace() > > > > > > > > System.exit(1) > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > As you can see, I added many print statement in the above > > > > function, and > > > > > > > > the following is the output of above function: > > > > > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764) > > > > > > > > msg.key: [B@7ad40950 > > > > > > > > msg.message: [B@7ce18764 > > > > > > > > msg.partKey: [B@7ad40950 > > > > > > > > msg.topic: testingInput > > > > > > > > > > > > > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think > > it > > > > is a > > > > > > > > memory address and the exception > > (java.lang.ClassCastException: [B > > > > cannot > > > > > > > > be cast to java.lang.String) happens when "send" function try > > to > > > > convert > > > > > > > > the Byte Array to String. > > > > > > > > > > > > > > > > Am I wrong on creating a key in Byte Array type? > > > > > > > > Some examples of how to use KeyedMessage will be great! > > > > > > > > > > > > > > > > Regards, > > > > > > > > Haoming > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >