Hi -
Just following up on this, I ran into a couple other problems, as I am trying
to use a String for partition key.
Passing String types for either key or partitionKey generated:
KafkaSystemProducer [WARN] Triggering a reconnect for kafka because connection
failed: java.lang.ClassCastException: java.lang.String cannot be cast to [B
I was able to configure kafka with:
partitioner.class=kafka.producer.DefaultPartitioner
In which case I can now use string for partitionKey and byte[] for key, which
worked (yay!).
However, if I change to pass only key, it still fails with the
java.lang.ClassCastException: java.lang.String cannot be cast to [B
I also tried specifying in the kafka producer.properties:
key.serializer.class=kafka.serializer.StringEncoder
but had the same results.
Is there something special I need to do to use String type for key when using:
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object key,
java.lang.Object message)
Thanks
Tyson
On Apr 14, 2014, at 12:58 PM, Chris Riccomini <[email protected]> wrote:
> Hey Tyler,
>
> Yeah, sorry this is not more clear. Physical Kafka partitions are
> per-topic. The default partition count for a newly created topic in Kafka
> is defined using the num.partitions setting, as you've discovered. The
> default setting that Kafka ships with is 1. This can be overridden on a
> per-topic basis by using the kafka-create-topic.sh tool.
>
> Cheers,
> Chris
>
> On 4/14/14 12:47 PM, "Tyson Norris" <[email protected]> wrote:
>
>> OK that was wrong as well (my SystemFactory is indeed supposed to use a
>> single partition, just not the kafka system factory), but I finally got
>> things working as expected with some kafka config changes.
>>
>> For now, I set in deploy/kafka/config/server.properties:
>> num.partitions=5
>>
>> (although I assume there is a better per-topic value I should set instead
>> of a default like this)
>>
>> And now I see multiple task instances created as desired.
>>
>> Thanks
>> Tyson
>>
>> On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote:
>>
>>> OK sorry for the noise.
>>> I stumbled upon another clue - my SystemFactory has (based on
>>> WikipediaSystemFactory) :
>>> @Override
>>> public SystemAdmin getAdmin(String systemName, Config config) {
>>> return new SinglePartitionWithoutOffsetsSystemAdmin();
>>> }
>>>
>>> Which I guess is a good reason my system is only using a single
>>> partition. Doh.
>>> I will work on a new SystemFactory impl to test with...
>>>
>>> Thanks
>>> Tyson
>>>
>>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote:
>>>
>>>> I am actually wondering if I’m missing a bit of configuration that
>>>> indicates the number of partitions I want to create for various kafka
>>>> topics that messages are sent to?
>>>>
>>>> I don’t see where this should be added in the config, and it appears
>>>> the partitions are not created automatically when I specify the key for
>>>> partitioning.
>>>>
>>>>
>>>> Thanks
>>>> Tyson
>>>>
>>>>
>>>>
>>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris
>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>
>>>> Hi Chris -
>>>>
>>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini
>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>
>>>> Hey Tyler,
>>>>
>>>> """I¹m trying to have multiple tasks instances run, each processing a
>>>> separate partition, and it appears that only a single task instance
>>>> runs,
>>>> processing all partitions. Or else my partitions are not created
>>>> properly."""
>>>>
>>>> Are you trying to consume a single stream that has multiple
>>>> partitions, or
>>>> are you processing multiple streams that all have one partition? If
>>>> it's
>>>> the latter, all of these messages will get routed to a single task
>>>> instance. There is an upcoming patch to allow alternative partition
>>>> task
>>>> instance mappings (SAMZA-71), which Jakob Homan is working on
>>>> currently.
>>>>
>>>> No I’m trying for the former, although my SystemConsumer is set up
>>>> like the latter. That is, I have a system consumer that should
>>>> generate messages in a single partition, and a task that takes all
>>>> messages and splits them into multiple partitions.
>>>>
>>>> So, in my SystemConsumer I have:
>>>> SystemStreamPartition systemStreamPartition = new
>>>> SystemStreamPartition(systemName, streamId, new Partition(0));
>>>> try {
>>>> put(systemStreamPartition, new
>>>> IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>>>>
>>>> which generates messages on the same stream + partition.
>>>>
>>>> Then in my first task I have:
>>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>> partitionKey.getBytes(), outgoingMap));
>>>>
>>>> which I am trying to get routed to separate task instances based on
>>>> partitionKey.
>>>>
>>>>
>>>> If you have a single input stream with multiple partitions, you should
>>>> end
>>>> up with one task instance per partition. This partitioning model is
>>>> explained in some detail at the 20 minute mark in this talk:
>>>>
>>>> http://www.infoq.com/presentations/samza-linkedin
>>>>
>>>> """the example in docs of collector.send(new
>>>> OutgoingMessageEnvelope(new
>>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"),
>>>> msg.get("user_id"),
>>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor
>>>> has
>>>> a key AND partitionKey - I assume the partitionKey should be
>>>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>>>> value for this message?"""
>>>>
>>>> The OutgoingMessageEnvelope has several constructors. The two you're
>>>> referring to are:
>>>>
>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>>>> partitionKey, Object key, Object message)
>>>>
>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>>>> Object message)
>>>>
>>>>
>>>> This is, indeed, odd. In general, people only want the second
>>>> constructor
>>>> (systemStream, key, message). The constructor with the partitionKey has
>>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>>>> with messages in the actual log segments on the disk. This is useful
>>>> because it means you can get access to the key information that was
>>>> sent
>>>> with the message. It also means that you can use log-compaction to
>>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>>>> cases where you might wish to partition a topic by one key (say, member
>>>> ID), but store (or de-deuplicate by) a different key with the message.
>>>>
>>>>
>>>> So in the case where I don’t care about deduplication, is the second
>>>> constructor “key” parameter actually used as partition key?
>>>>
>>>>
>>>>
>>>>
>>>> """Do I need to specify partition manager and yarn.container.count to
>>>> get
>>>> multiple instances of my task working to service separate
>>>> partitions?"""
>>>>
>>>> This class has been replaced by the KafkaSystemFactory and
>>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>>>> partitions will be handled properly by Samza. The yarn.container.count
>>>> simply specifies how many containers (java processes) you get to run
>>>> your
>>>> tasks in. If you have only one TaskInstance, but specify a container
>>>> count
>>>> of 2, the second container won't have any partitions to process, and I
>>>> believe the job will fail. You need to set your container count <= the
>>>> partition count of your input topics.
>>>>
>>>> Ok, so it sounds like should be able to have multiple task instances
>>>> in a single container, if the partitioning works.
>>>>
>>>> Thanks!
>>>> Tyson
>>>>
>>>>
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On 4/11/14 12:55 PM, "Tyson Norris"
>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>
>>>> Possibly related - I cannot seem to find the source for
>>>> KafkaPartitionManager - can someone point me to it?
>>>>
>>>> Thanks
>>>> Tyson
>>>>
>>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris
>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>
>>>> Hi -
>>>> I have a couple questions about partitioning - I¹m trying to have
>>>> multiple tasks instances run, each processing a separate partition, and
>>>> it appears that only a single task instance runs, processing all
>>>> partitions. Or else my partitions are not created properly. This is
>>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>>> which config+code steps to take to enable partitioning of message to
>>>> multiple instances of the same task.
>>>>
>>>> To route to a partition i use: messageCollector.send(new
>>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>>> outgoingMap));
>>>> - question here: the example in docs of collector.send(new
>>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>>> confusing, because the OutgoingMessageEnvelope constructor has a key
>>>> AND
>>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²)
>>>> in
>>>> this case, but what should key be? Just a unique value for this
>>>> message?
>>>>
>>>> I tried the 3 parameter constructor as well and have similar problems,
>>>> where my single task instance is used regardless of partitionKey
>>>> specified in the OutgoingMessageEnvelope.
>>>>
>>>> Do I need to specify partition manager and yarn.container.count to get
>>>> multiple instances of my task working to service separate partitions?
>>>>
>>>> I¹m not sure how to tell if my messages are routed to the correct
>>>> partition in kafka, or whether the problem is a partition handling
>>>> config in samza.
>>>>
>>>> Any advice is appreciated!
>>>>
>>>> Thanks
>>>> Tyson
>>>>
>>>
>>
>