OK that was wrong as well (my SystemFactory is indeed supposed to use a single 
partition, just not the kafka system factory), but I finally got things working 
as expected with some kafka config changes. 

For now, I set in deploy/kafka/config/server.properties:
num.partitions=5

(although I assume there is a better per-topic value I should set instead of a 
default like this)

And now I see multiple task instances created as desired. 

Thanks
Tyson

On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote:

> OK sorry for the noise. 
> I stumbled upon another clue - my SystemFactory has (based on 
> WikipediaSystemFactory) :
>    @Override
>    public SystemAdmin getAdmin(String systemName, Config config) {
>        return new SinglePartitionWithoutOffsetsSystemAdmin();
>    }
> 
> Which I guess is a good reason my system is only using a single partition. 
> Doh.
> I will work on a new SystemFactory impl to test with...
> 
> Thanks
> Tyson
> 
> On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote:
> 
>> I am actually wondering if I’m missing a bit of configuration that indicates 
>> the number of partitions I want to create for various kafka topics that 
>> messages are sent to?
>> 
>> I don’t see where this should be added in the config, and it appears the 
>> partitions are not created automatically when I specify the key for 
>> partitioning.
>> 
>> 
>> Thanks
>> Tyson
>> 
>> 
>> 
>> On Apr 14, 2014, at 10:35 AM, Tyson Norris 
>> <[email protected]<mailto:[email protected]>> wrote:
>> 
>> Hi Chris -
>> 
>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini 
>> <[email protected]<mailto:[email protected]>> wrote:
>> 
>> Hey Tyler,
>> 
>> """I¹m trying to have multiple tasks instances run, each processing a
>> separate partition, and it appears that only a single task instance runs,
>> processing all partitions. Or else my partitions are not created
>> properly."""
>> 
>> Are you trying to consume a single stream that has multiple partitions, or
>> are you processing multiple streams that all have one partition? If it's
>> the latter, all of these messages will get routed to a single task
>> instance. There is an upcoming patch to allow alternative partition task
>> instance mappings (SAMZA-71), which Jakob Homan is working on currently.
>> 
>> No I’m trying for the former, although my SystemConsumer is set up like the 
>> latter.  That is, I have a system consumer that should generate messages in 
>> a single partition, and a task that takes all messages and splits them into 
>> multiple partitions.
>> 
>> So, in my SystemConsumer I have:
>>      SystemStreamPartition systemStreamPartition = new 
>> SystemStreamPartition(systemName, streamId, new Partition(0));
>>      try {
>>          put(systemStreamPartition, new 
>> IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>> 
>> which generates messages on the same stream + partition.
>> 
>> Then in my first task I have:
>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, 
>> partitionKey.getBytes(), outgoingMap));
>> 
>> which I am trying to get routed to separate task instances based on 
>> partitionKey.
>> 
>> 
>> If you have a single input stream with multiple partitions, you should end
>> up with one task instance per partition. This partitioning model is
>> explained in some detail at the 20 minute mark in this talk:
>> 
>> http://www.infoq.com/presentations/samza-linkedin
>> 
>> """the example in docs of collector.send(new OutgoingMessageEnvelope(new
>> SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
>> a key AND partitionKey - I assume the partitionKey should be
>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>> value for this message?"""
>> 
>> The OutgoingMessageEnvelope has several constructors. The two you're
>> referring to are:
>> 
>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>> partitionKey, Object key, Object message)
>> 
>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>> Object message)
>> 
>> 
>> This is, indeed, odd. In general, people only want the second constructor
>> (systemStream, key, message). The constructor with the partitionKey has
>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>> with messages in the actual log segments on the disk. This is useful
>> because it means you can get access to the key information that was sent
>> with the message. It also means that you can use log-compaction to
>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>> cases where you might wish to partition a topic by one key (say, member
>> ID), but store (or de-deuplicate by) a different key with the message.
>> 
>> 
>> So in the case where I don’t care about deduplication, is the second 
>> constructor “key” parameter actually used as partition key?
>> 
>> 
>> 
>> 
>> """Do I need to specify partition manager and yarn.container.count to get
>> multiple instances of my task working to service separate partitions?"""
>> 
>> This class has been replaced by the KafkaSystemFactory and
>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>> partitions will be handled properly by Samza. The yarn.container.count
>> simply specifies how many containers (java processes) you get to run your
>> tasks in. If you have only one TaskInstance, but specify a container count
>> of 2, the second container won't have any partitions to process, and I
>> believe the job will fail. You need to set your container count <=  the
>> partition count of your input topics.
>> 
>> Ok, so it sounds like should be able to have multiple task instances in a 
>> single container, if the partitioning works.
>> 
>> Thanks!
>> Tyson
>> 
>> 
>> 
>> Cheers,
>> Chris
>> 
>> On 4/11/14 12:55 PM, "Tyson Norris" 
>> <[email protected]<mailto:[email protected]>> wrote:
>> 
>> Possibly related - I cannot seem to find the source for
>> KafkaPartitionManager - can someone point me to it?
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 11, 2014, at 12:15 PM, Tyson Norris 
>> <[email protected]<mailto:[email protected]>> wrote:
>> 
>> Hi -
>> I have a couple questions about partitioning - I¹m trying to have
>> multiple tasks instances run, each processing a separate partition, and
>> it appears that only a single task instance runs, processing all
>> partitions. Or else my partitions are not created properly. This is
>> based on a modified version of hello-samza, so I¹m not sure exactly
>> which config+code steps to take to enable partitioning of message to
>> multiple instances of the same task.
>> 
>> To route to a partition i use: messageCollector.send(new
>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>> outgoingMap));
>> - question here: the example in docs of collector.send(new
>> OutgoingMessageEnvelope(new SystemStream("kafka",
>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>> confusing, because the OutgoingMessageEnvelope constructor has a key AND
>> partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
>> this case, but what should key be? Just a unique value for this message?
>> 
>> I tried the 3 parameter constructor as well and have similar problems,
>> where my single task instance is used regardless of partitionKey
>> specified in the OutgoingMessageEnvelope.
>> 
>> Do I need to specify partition manager and yarn.container.count to get
>> multiple instances of my task working to service separate partitions?
>> 
>> I¹m not sure how to tell if my messages are routed to the correct
>> partition in kafka, or whether the problem is a partition handling
>> config in samza.
>> 
>> Any advice is appreciated!
>> 
>> Thanks
>> Tyson
>> 
> 

Reply via email to