Hi LiYuan,

Thank you very much for your response.
This problem was solved. Actually, a lots of messages are created almost in
the same time (even use milliseconds). I changed the key with using
"UUID.randomUUID()" with which all messages can be inserted in the
Cassandra table without time lag.

Regards,
Jerry Wong


On Wed, Feb 17, 2016 at 9:43 PM, yuanjia8...@163.com <yuanjia8...@163.com>
wrote:

> Hi Jerry,
>     1. Make sure that 1000 messages have been sent to kafka, before
> consuming.
>     2. If you don't care the sequence between messages, you can use
> mutiple partition and use more comsumers.
>
>
>
> LiYuanJia
>
> From: Jerry Wong
> Date: 2016-02-17 05:33
> To: users
> Subject: Optimize the performance of inserting data to Cassandra with
> Kafka and Spark Streaming
> Hello Everybody,
>
> I have questions using Spark streaming to consume data from Kafka and
> insert to Cassandra database but not sure whether should post in the Kafka
> users mailing list or not. I appreciated it if you do have any suggestions
> to me.
>
> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
> Hadoop, Cassandra
> Scala: 2.10.5
> Spark: 1.2.2
> Hadoop: 1.2.1
> Cassandra 2.0.18
>
> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
> Kafka: 0.8.2.1
> Zookeeper: 3.4.6
>
> Other configurations:
> batchInterval = 6 Seconds
> blockInterval = 1500 millis
> spark.locality.wait = 500 millis
> #Consumers = 10
>
> There are two columns in the cassandra table
> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>
> Here is a piece of codes,
>
> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
> StorageLevel.MEMORY_AND_DISK_SER)
>         .map(_._2.toString).map(Tuple1(_))
>         .map{case(log) => (System.currentTimeMillis(), log)}
> }
> @transient val unifiedMessage = ssc.union(kstreams)
>
> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
> SomeColumns("createdtime", "log"))
>
> I created a producer and send messages to Brokers (1000 messages/per time)
>
> But the Cassandra can only be inserted about 100 messages in each round of
> test.
> Can anybody give me advices why the other messages (about 900 message)
> can't be consumed?
> How do I configure and tune the parameters in order to improve the
> throughput of consumers?
>
> Thank you very much for your reading and suggestions in advances.
>
> Jerry Wong
>

Reply via email to