I am actually wondering if I’m missing a bit of configuration that indicates 
the number of partitions I want to create for various kafka topics that 
messages are sent to?

I don’t see where this should be added in the config, and it appears the 
partitions are not created automatically when I specify the key for 
partitioning.


Thanks
Tyson



On Apr 14, 2014, at 10:35 AM, Tyson Norris 
<[email protected]<mailto:[email protected]>> wrote:

Hi Chris -

On Apr 14, 2014, at 9:13 AM, Chris Riccomini 
<[email protected]<mailto:[email protected]>> wrote:

Hey Tyler,

"""I¹m trying to have multiple tasks instances run, each processing a
separate partition, and it appears that only a single task instance runs,
processing all partitions. Or else my partitions are not created
properly."""

Are you trying to consume a single stream that has multiple partitions, or
are you processing multiple streams that all have one partition? If it's
the latter, all of these messages will get routed to a single task
instance. There is an upcoming patch to allow alternative partition task
instance mappings (SAMZA-71), which Jakob Homan is working on currently.

No I’m trying for the former, although my SystemConsumer is set up like the 
latter.  That is, I have a system consumer that should generate messages in a 
single partition, and a task that takes all messages and splits them into 
multiple partitions.

So, in my SystemConsumer I have:
       SystemStreamPartition systemStreamPartition = new 
SystemStreamPartition(systemName, streamId, new Partition(0));
       try {
           put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null, object));

which generates messages on the same stream + partition.

Then in my first task I have:
messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, 
partitionKey.getBytes(), outgoingMap));

which I am trying to get routed to separate task instances based on 
partitionKey.


If you have a single input stream with multiple partitions, you should end
up with one task instance per partition. This partitioning model is
explained in some detail at the 20 minute mark in this talk:

http://www.infoq.com/presentations/samza-linkedin

"""the example in docs of collector.send(new OutgoingMessageEnvelope(new
SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
a key AND partitionKey - I assume the partitionKey should be
msg.get(³user_id²) in this case, but what should key be? Just a unique
value for this message?"""

The OutgoingMessageEnvelope has several constructors. The two you're
referring to are:

public OutgoingMessageEnvelope(SystemStream systemStream, Object
partitionKey, Object key, Object message)

public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
Object message)


This is, indeed, odd. In general, people only want the second constructor
(systemStream, key, message). The constructor with the partitionKey has
its origins in the Kafka API. With Kafka 0.8, keys are now stored along
with messages in the actual log segments on the disk. This is useful
because it means you can get access to the key information that was sent
with the message. It also means that you can use log-compaction to
de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
cases where you might wish to partition a topic by one key (say, member
ID), but store (or de-deuplicate by) a different key with the message.


So in the case where I don’t care about deduplication, is the second 
constructor “key” parameter actually used as partition key?




"""Do I need to specify partition manager and yarn.container.count to get
multiple instances of my task working to service separate partitions?"""

This class has been replaced by the KafkaSystemFactory and
KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
partitions will be handled properly by Samza. The yarn.container.count
simply specifies how many containers (java processes) you get to run your
tasks in. If you have only one TaskInstance, but specify a container count
of 2, the second container won't have any partitions to process, and I
believe the job will fail. You need to set your container count <=  the
partition count of your input topics.

Ok, so it sounds like should be able to have multiple task instances in a 
single container, if the partitioning works.

Thanks!
Tyson



Cheers,
Chris

On 4/11/14 12:55 PM, "Tyson Norris" 
<[email protected]<mailto:[email protected]>> wrote:

Possibly related - I cannot seem to find the source for
KafkaPartitionManager - can someone point me to it?

Thanks
Tyson

On Apr 11, 2014, at 12:15 PM, Tyson Norris 
<[email protected]<mailto:[email protected]>> wrote:

Hi -
I have a couple questions about partitioning - I¹m trying to have
multiple tasks instances run, each processing a separate partition, and
it appears that only a single task instance runs, processing all
partitions. Or else my partitions are not created properly. This is
based on a modified version of hello-samza, so I¹m not sure exactly
which config+code steps to take to enable partitioning of message to
multiple instances of the same task.

To route to a partition i use: messageCollector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
outgoingMap));
- question here: the example in docs of collector.send(new
OutgoingMessageEnvelope(new SystemStream("kafka",
"SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
confusing, because the OutgoingMessageEnvelope constructor has a key AND
partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
this case, but what should key be? Just a unique value for this message?

I tried the 3 parameter constructor as well and have similar problems,
where my single task instance is used regardless of partitionKey
specified in the OutgoingMessageEnvelope.

Do I need to specify partition manager and yarn.container.count to get
multiple instances of my task working to service separate partitions?

I¹m not sure how to tell if my messages are routed to the correct
partition in kafka, or whether the problem is a partition handling
config in samza.

Any advice is appreciated!

Thanks
Tyson

Reply via email to