Hi LiYuan, Thank you very much for your response. This problem was solved. Actually, a lots of messages are created almost in the same time (even use milliseconds). I changed the key with using "UUID.randomUUID()" with which all messages can be inserted in the Cassandra table without time lag.
Regards, Jerry Wong On Wed, Feb 17, 2016 at 9:43 PM, yuanjia8...@163.com <yuanjia8...@163.com> wrote: > Hi Jerry, > 1. Make sure that 1000 messages have been sent to kafka, before > consuming. > 2. If you don't care the sequence between messages, you can use > mutiple partition and use more comsumers. > > > > LiYuanJia > > From: Jerry Wong > Date: 2016-02-17 05:33 > To: users > Subject: Optimize the performance of inserting data to Cassandra with > Kafka and Spark Streaming > Hello Everybody, > > I have questions using Spark streaming to consume data from Kafka and > insert to Cassandra database but not sure whether should post in the Kafka > users mailing list or not. I appreciated it if you do have any suggestions > to me. > > 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark, > Hadoop, Cassandra > Scala: 2.10.5 > Spark: 1.2.2 > Hadoop: 1.2.1 > Cassandra 2.0.18 > > 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory) > Kafka: 0.8.2.1 > Zookeeper: 3.4.6 > > Other configurations: > batchInterval = 6 Seconds > blockInterval = 1500 millis > spark.locality.wait = 500 millis > #Consumers = 10 > > There are two columns in the cassandra table > keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log". > > Here is a piece of codes, > > @transient val kstreams = (1 to numConsumers.toInt).map { _ => > KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1), > StorageLevel.MEMORY_AND_DISK_SER) > .map(_._2.toString).map(Tuple1(_)) > .map{case(log) => (System.currentTimeMillis(), log)} > } > @transient val unifiedMessage = ssc.union(kstreams) > > unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA", > SomeColumns("createdtime", "log")) > > I created a producer and send messages to Brokers (1000 messages/per time) > > But the Cassandra can only be inserted about 100 messages in each round of > test. > Can anybody give me advices why the other messages (about 900 message) > can't be consumed? > How do I configure and tune the parameters in order to improve the > throughput of consumers? > > Thank you very much for your reading and suggestions in advances. > > Jerry Wong >