Hi Chris -

On Apr 14, 2014, at 9:13 AM, Chris Riccomini <[email protected]> wrote:

> Hey Tyler,
> 
> """I¹m trying to have multiple tasks instances run, each processing a
> separate partition, and it appears that only a single task instance runs,
> processing all partitions. Or else my partitions are not created
> properly."""
> 
> Are you trying to consume a single stream that has multiple partitions, or
> are you processing multiple streams that all have one partition? If it's
> the latter, all of these messages will get routed to a single task
> instance. There is an upcoming patch to allow alternative partition task
> instance mappings (SAMZA-71), which Jakob Homan is working on currently.

No I’m trying for the former, although my SystemConsumer is set up like the 
latter.  That is, I have a system consumer that should generate messages in a 
single partition, and a task that takes all messages and splits them into 
multiple partitions. 

So, in my SystemConsumer I have:
        SystemStreamPartition systemStreamPartition = new 
SystemStreamPartition(systemName, streamId, new Partition(0));
        try {
            put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null, object));

which generates messages on the same stream + partition.

Then in my first task I have:
messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, 
partitionKey.getBytes(), outgoingMap));

which I am trying to get routed to separate task instances based on 
partitionKey.

> 
> If you have a single input stream with multiple partitions, you should end
> up with one task instance per partition. This partitioning model is
> explained in some detail at the 20 minute mark in this talk:
> 
>  http://www.infoq.com/presentations/samza-linkedin
> 
> """the example in docs of collector.send(new OutgoingMessageEnvelope(new
> SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
> msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
> a key AND partitionKey - I assume the partitionKey should be
> msg.get(³user_id²) in this case, but what should key be? Just a unique
> value for this message?"""
> 
> The OutgoingMessageEnvelope has several constructors. The two you're
> referring to are:
> 
>  public OutgoingMessageEnvelope(SystemStream systemStream, Object
> partitionKey, Object key, Object message)
> 
>  public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
> Object message)
> 
> 
> This is, indeed, odd. In general, people only want the second constructor
> (systemStream, key, message). The constructor with the partitionKey has
> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
> with messages in the actual log segments on the disk. This is useful
> because it means you can get access to the key information that was sent
> with the message. It also means that you can use log-compaction to
> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
> cases where you might wish to partition a topic by one key (say, member
> ID), but store (or de-deuplicate by) a different key with the message.
> 

So in the case where I don’t care about deduplication, is the second 
constructor “key” parameter actually used as partition key?




> """Do I need to specify partition manager and yarn.container.count to get
> multiple instances of my task working to service separate partitions?"""
> 
> This class has been replaced by the KafkaSystemFactory and
> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
> partitions will be handled properly by Samza. The yarn.container.count
> simply specifies how many containers (java processes) you get to run your
> tasks in. If you have only one TaskInstance, but specify a container count
> of 2, the second container won't have any partitions to process, and I
> believe the job will fail. You need to set your container count <=  the
> partition count of your input topics.

Ok, so it sounds like should be able to have multiple task instances in a 
single container, if the partitioning works.

Thanks!
Tyson


> 
> Cheers,
> Chris
> 
> On 4/11/14 12:55 PM, "Tyson Norris" <[email protected]> wrote:
> 
>> Possibly related - I cannot seem to find the source for
>> KafkaPartitionManager - can someone point me to it?
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 11, 2014, at 12:15 PM, Tyson Norris <[email protected]> wrote:
>> 
>>> Hi - 
>>> I have a couple questions about partitioning - I¹m trying to have
>>> multiple tasks instances run, each processing a separate partition, and
>>> it appears that only a single task instance runs, processing all
>>> partitions. Or else my partitions are not created properly. This is
>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>> which config+code steps to take to enable partitioning of message to
>>> multiple instances of the same task.
>>> 
>>> To route to a partition i use: messageCollector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>> outgoingMap));
>>> - question here: the example in docs of collector.send(new
>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>> confusing, because the OutgoingMessageEnvelope constructor has a key AND
>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
>>> this case, but what should key be? Just a unique value for this message?
>>> 
>>> I tried the 3 parameter constructor as well and have similar problems,
>>> where my single task instance is used regardless of partitionKey
>>> specified in the OutgoingMessageEnvelope.
>>> 
>>> Do I need to specify partition manager and yarn.container.count to get
>>> multiple instances of my task working to service separate partitions?
>>> 
>>> I¹m not sure how to tell if my messages are routed to the correct
>>> partition in kafka, or whether the problem is a partition handling
>>> config in samza.
>>> 
>>> Any advice is appreciated!
>>> 
>>> Thanks
>>> Tyson
> 

Reply via email to