[
https://issues.apache.org/jira/browse/SAMZA-489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martin Kleppmann reassigned SAMZA-489:
--------------------------------------
Assignee: Martin Kleppmann
I spent a while with [Ian Meyers|https://www.linkedin.com/in/ianmeyers] today,
working through some of the details of what Kinesis support for Samza could
look like.
The aforementioned sharding behaviour of Kinesis unfortunately does not fit
very well with a stateful processing model that has local state. The problem is
that a shard (which owns a certain range of the key hash space) can be split
arbitrarily, not just at half-way point. This is a deliberate decision as it
gives operators the ability to deal with hot spots in the data: to give an
extreme example, if there's one particular key with particularly high message
volume, that key could be put in a shard all by itself. Kinesis doesn't provide
any clever tooling for determining how to split shards — it's pretty much left
up to the user.
(Ian and I think that this was probably a design mistake. It would have been
better to deal with hot spots by appending a random number to messages with the
hot key, and thus scatter them across the hash space. However, that would mean
losing the total ordering for those keys, and make aggregations harder.
Trade-offs, trade-offs.)
Anyway, this means that whilst the mapping from message keys to shards is clear
at one moment in time, that mapping may well change over time as shards are
split and merged. Any stateful process that wants to partition its state along
with the shards of the streams is thus going to have a hard time. This is not a
problem with Samza, it's an intrinsic issue with the way Kinesis works.
At the moment we're working on a prototype implementation of a Kinesis consumer
which will ignore this issue for now. It should be fine for consuming messages,
but probably won't support durable local state. (In-memory transient state is
fine, and state backed by a remote datastore is fine.) The lack of support for
local state is further reinforced by Kinesis only retaining data for 24 hours,
and not supporting log compaction like Kafka does, making it unsuitable as
durable changelog for Samza's k-v stores.
Despite all those caveats, I reckon this will be a useful feature.
> Support Amazon Kinesis
> ----------------------
>
> Key: SAMZA-489
> URL: https://issues.apache.org/jira/browse/SAMZA-489
> Project: Samza
> Issue Type: New Feature
> Reporter: Martin Kleppmann
> Assignee: Martin Kleppmann
> Labels: project
>
> [AWS Kinesis|http://aws.amazon.com/kinesis/] is a publish-subscribe message
> broker service quite similar to Kafka, provided as a hosted service by
> Amazon. I have spoken to a few people who are interested in using Kinesis
> with Samza, since the options for stateful stream processing with Kinesis are
> currently quite limited. Samza's local state support would be great for
> Kinesis users.
> I've looked a little into what it would take to support Kinesis in Samza.
> Useful resources:
> * [Kinesis Client Library for
> Java|https://github.com/awslabs/amazon-kinesis-client]
> * [Kinesis developer
> guide|http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html]
> * [Description of
> resharding|http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-api-java.html#kinesis-using-api-java-resharding]
> Kinesis is similar to Kafka in that it has total ordering of messages within
> a partition (which Kinesis calls a "shard"). The most notable differences I
> noticed are:
> * Kinesis does not support compaction by key, and only keeps messages for 24
> hours (the "trim horizon"). Thus it cannot be used for checkpointing and
> state store changelogging. Another service must be used for durable storage
> (Amazon recommends DynamoDB).
> * It is common for the number of shards in a stream to change ("resharding"),
> because a Kinesis shard is a unit of resourcing, not a logical grouping. A
> Kinesis shard is more like a Kafka broker node, not like a Kafka partition.
> The second point suggests that Kinesis shards should not be mapped 1:1 to
> Samza StreamTasks like we do for Kafka, because whenever the number of shards
> changes, any state associated with a StreamTask would no longer be in the
> right place.
> Kinesis assigns a message to a shard based on the MD5 hash of the message's
> partition key (so all messages with the same partition key are guaranteed to
> be in the same shard). Each shard owns a continuous range of the MD5 hash
> space. When the number of shards is increased by one, a shard's hash range is
> subdivided into two sub-ranges. When the number of shards is decreased by
> one, two adjacent shards' hash ranges are merged into a single range.
> I think the nicest way of modelling this in Samza would be to create a fixed
> number of StreamTasks (e.g. 256, but make it configurable), and to assign
> each task a fixed slice of this MD5 hash space. Each Kinesis shard then
> corresponds to a subset of these StreamTasks, and the SystemConsumer
> implementation routes messages from a shard to the appropriate StreamTask
> based on the hash of the message's partition key. This implies that all the
> StreamTasks for a particular Kinesis shard should be processed within the
> same container. This is not unlike the Kafka consumer in Samza, which fetches
> messages for all of a container's Kafka partitions in one go.
> This solves removes the semantic problem of resharding: we can ensure that
> messages with the same partition key are always routed to the same
> StreamTask, even across shard splits and merges.
> However, there are still some tricky edge cases to handle. For example, if
> Kinesis decides to merge two shards that are currently processed by two
> different Samza containers, what should Samza do? A simple (but perhaps a bit
> wasteful) solution would be for both containers to continue consuming the
> merged shard. Alternatively, Samza could reassign some StreamTasks from one
> container to another, but that would require any state to be moved or
> rebuilt. Probably double-consuming would make most sense for a first
> implementation.
> In summary, it looks like Kinesis support is feasible, and would be a fun
> challenge for someone to take on. Contributions welcome :)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)