Re: Kinesis receiver spark streaming partition

2014-09-28 Thread Wei Liu
Chris,

Think I will check back with you to see if you made progress on this issue.
Any good news so far? Thanks. Once again, I really appreciate you look into
this issue.

Thanks,
Wei

On Thu, Aug 28, 2014 at 4:44 PM, Chris Fregly ch...@fregly.com wrote:

 great question, wei.  this is very important to understand from a
 performance perspective.  and this extends is beyond kinesis - it's for any
 streaming source that supports shards/partitions.

 i need to do a little research into the internals to confirm my theory.

 lemme get back to you!

 -chris


 On Tue, Aug 26, 2014 at 11:37 AM, Wei Liu wei@stellarloyalty.com
 wrote:

 We are exploring using Kinesis and spark streaming together. I took at a
 look at the kinesis receiver code in 1.1.0. I have a question regarding
 kinesis partition  spark streaming partition. It seems to be pretty
 difficult to align these partitions.

 Kinesis partitions a stream of data into shards, if we follow the
 example, we will have multiple kinesis receivers reading from the same
 stream in spark streaming. It seems like kinesis workers will coordinate
 among themselves and assign shards to themselves dynamically. For a
 particular shard, it can be consumed by different kinesis workers (thus
 different spark workers) dynamically (not at the same time). Blocks are
 generated based on time intervals, RDD are created based on blocks. RDDs
 are partitioned based on blocks. At the end, the data for a given shard
 will be spread into multiple blocks (possible located on different spark
 worker nodes).

 We will probably need to group these data again for a given shard and
 shuffle data around to achieve the same partition we had in Kinesis.

 Is there a better way to achieve this to avoid data reshuffling?

 Thanks,
 Wei





Kinesis receiver spark streaming partition

2014-08-26 Thread Wei Liu
We are exploring using Kinesis and spark streaming together. I took at a
look at the kinesis receiver code in 1.1.0. I have a question regarding
kinesis partition  spark streaming partition. It seems to be pretty
difficult to align these partitions.

Kinesis partitions a stream of data into shards, if we follow the example,
we will have multiple kinesis receivers reading from the same stream in
spark streaming. It seems like kinesis workers will coordinate among
themselves and assign shards to themselves dynamically. For a particular
shard, it can be consumed by different kinesis workers (thus different
spark workers) dynamically (not at the same time). Blocks are generated
based on time intervals, RDD are created based on blocks. RDDs are
partitioned based on blocks. At the end, the data for a given shard will be
spread into multiple blocks (possible located on different spark worker
nodes).

We will probably need to group these data again for a given shard and
shuffle data around to achieve the same partition we had in Kinesis.

Is there a better way to achieve this to avoid data reshuffling?

Thanks,
Wei


Re: Multiple column families vs Multiple tables

2014-08-19 Thread Wei Liu
Chutium, thanks for your advices. I will check out your links.

I sent the email to the wrong email address! Sorry for the spam.

Wei


On Tue, Aug 19, 2014 at 4:49 PM, chutium teng@gmail.com wrote:

 ö_ö  you should send this message to hbase user list, not spark user
 list...

 but i can give you some personal advice about this, keep column families as
 few as possible!

 at least, use some prefix of column qualifier could also be an idea. but
 read performance may be worse for your use case like search for a row with
 value x in column family A and with value Y in column family B.

 so it depends on which workload is important for you, if your use case is
 very read-heavy and you really want to use multi column families to hold a
 good read performance, you should try to disable region split, adjust
 compaction interval carefully, and so on.

 there is a good slide for this:

 http://photo.weibo.com/1431095941/wbphotos/large/mid/3735178188435939/pid/554cca85gw1eiloddlqa5j20or0ik77z

 more slides about hbase + coprocessor, hbase + hive and hbase + spark:
 http://www.weibo.com/1431095941/BeL90zozx




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-column-families-vs-Multiple-tables-tp12425p12439.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
We are prototyping an application with Spark streaming and Kinesis. We use
kinesis to accept incoming txn data, and then process them using spark
streaming. So far we really liked both technologies, and we saw both
technologies are getting mature rapidly. We are almost settled to use these
two technologies, but we are a little scary by the paragraph in the
programming guide.

For network-based data sources like Kafka and Flume, the received input
data is replicated in memory between nodes of the cluster (default
replication factor is 2). So if a worker node fails, then the system can
recompute the lost from the the left over copy of the input data. However,
if the worker node where a network receiver was running fails, then a tiny
bit of data may be lost, that is, the data received by the system but not
yet replicated to other node(s). The receiver will be started on a
different node and it will continue to receive data.

Since our application cannot tolerate losing customer data, I am wondering
what is the best way for us to address this issue.
1) We are thinking writing application specific logic to address the data
loss. To us, the problem seems to be caused by that Kinesis receivers
advanced their checkpoint before we know for sure the data is replicated.
For example, we can do another checkpoint ourselves to remember the kinesis
sequence number for data that has been processed by spark streaming. When
Kinesis receiver is restarted due to worker failures, we restarted it from
the checkpoint we tracked. We also worry about our driver program (or the
whole cluster) dies because of a bug in the application, the above logic
will allow us to resume from our last checkpoint.

Is there any best practices out there for this issue? I suppose many folks
are using spark streaming with network receivers, any suggestion is
welcomed.
2) Write kinesis data to s3 first, then either use it as a backup or read
from s3 in spark streaming. This is the safest approach but with a
performance/latency penalty. On the other hand,  we may have to write data
to s3 anyway since Kinesis only stores up to 24 hours data just in case we
had a bad day in our server infrastructure.
3) Wait for this issue to be addressed in spark streaming. I found this
ticket https://issues.apache.org/jira/browse/SPARK-1647, but it is not
resolved yet.

Thanks,
Wei


Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
Thank you all for responding to my question. I am pleasantly surprised by
this many prompt responses I got. It shows the strength of the spark
community.

Kafka is still an option for us, I will check out the link provided by
Dibyendu.

Meanwhile if someone out there already figured this out with Kinesis,
please keep your suggestion coming. Thanks.

Thanks,
Wei


On Mon, Aug 18, 2014 at 9:31 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Dear All,

 Recently I have written a Spark Kafka Consumer to solve this problem. Even
 we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
 and consumer code has no handle to offset management.

 The below code solves this problem, and this has is being tested in our
 Spark Cluster and this working fine as of now.

 https://github.com/dibbhatt/kafka-spark-consumer

 This is Low Level Kafka Consumer using Kafka Simple Consumer API.

 Please have a look at it and let me know your opinion. This has been
 written to eliminate the Data loss by committing the offset after it is
 written to BM. Also existing HighLevel KafkaUtils does not have any feature
 to control Data Flow, and is gives Out Of Memory error is there is too much
 backlogs in Kafka. This consumer solves this problem as well.  And this
 code has been modified from earlier Storm Kafka consumer code and it has
 lot of other features like recovery from Kafka node failures, ZK failures,
 recover from Offset errors etc.

 Regards,
 Dibyendu


 On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think Currently Spark Streaming lack a data acknowledging mechanism
 when data is stored and replicated in BlockManager, so potentially data
 will be lost even pulled into Kafka, say if data is stored just in
 BlockGenerator not BM, while in the meantime Kafka itself commit the
 consumer offset, also at this point node is failed, from Kafka’s point this
 part of data is feed into Spark Streaming but actually this data is not yet
 processed, so potentially this part of data will never be processed again,
 unless you read the whole partition again.



 To solve this potential data loss problem, Spark Streaming needs to offer
 a data acknowledging mechanism, so custom Receiver can use this
 acknowledgement to do checkpoint or recovery, like Storm.



 Besides, driver failure is another story need to be carefully considered.
 So currently it is hard to make sure no data loss in Spark Streaming, still
 need to improve at some points J.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
 *Sent:* Tuesday, August 19, 2014 10:47 AM
 *To:* Wei Liu
 *Cc:* user
 *Subject:* Re: Data loss - Spark streaming and network receiver



 Hi Wei,



 On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com
 wrote:

 Since our application cannot tolerate losing customer data, I am
 wondering what is the best way for us to address this issue.

 1) We are thinking writing application specific logic to address the data
 loss. To us, the problem seems to be caused by that Kinesis receivers
 advanced their checkpoint before we know for sure the data is replicated.
 For example, we can do another checkpoint ourselves to remember the kinesis
 sequence number for data that has been processed by spark streaming. When
 Kinesis receiver is restarted due to worker failures, we restarted it from
 the checkpoint we tracked.



 This sounds pretty much to me like the way Kafka does it. So, I am not
 saying that the stock KafkaReceiver does what you want (it may or may not),
 but it should be possible to update the offset (corresponds to sequence
 number) in Zookeeper only after data has been replicated successfully. I
 guess replace Kinesis by Kafka is not in option for you, but you may
 consider pulling Kinesis data into Kafka before processing with Spark?



 Tobias