My bad... I should have considered this in the first place. You are
absolutely right.

Supporting this kind of a join is work in progress.
https://issues.apache.org/jira/browse/KAFKA-3705

Your custom solution (Option 1) might work... But as you mentioned, the
problem will be that the first table gets replicated (and not
partitioned) over all Processors, which might become a problem.

-Matthias


On 07/14/2016 05:22 PM, Srikanth wrote:
> I should have mentioned that I tried this. It worked in other case but will
> not work for this one.
> I'm pasting a sample from table1 that I gave in my first email.
> 
> Table1
>   111 -> aaa
>   222 -> bbb
>   333 -> aaa
> 
> Here value is not unique(aaa). So, I can't just make it a key. 333 will
> then override 111.
> 
> Srikanth
> 
> On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> You will need to set a new key before you do the re-partitioning. In
>> your case, it seems you want to switch key and value. This can be done
>> with a simple map
>>
>>> table1.toStream()
>>>       .map(new KeyValueMapper<K, V, KeyValue<V, K>() {
>>>              public KeyValue<V, K> apply(K key, V value) {
>>>                return new KeyValue<V, K>(value, key);
>>>              }
>>>            })
>>>       .to("my-repartioning-topic");
>>> newTable1 = builder.table("my-repartioning-topic");
>>
>> With K and V being the actual types of key and value in table1.
>>
>> Of course, you can modify the table entries in map() in any other way
>> that suits your use case. You only need to make sure, to set the (join)
>> key before you do the re-partitioning.
>>
>> -Matthias
>>
>>
>> On 07/14/2016 04:47 PM, Srikanth wrote:
>>> Matthias,
>>>
>>> With option 2, how would we perform join after re-partition. Although we
>>> re-partitioned with value, the key doesn't change.
>>> KTable joins always use keys and ValueJoiner get values from both table
>>> when keys match.
>>>
>>> Having data co-located will not be sufficient rt??
>>>
>>> Srikanth
>>>
>>> On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> I would recommend re-partitioning as described in Option 2.
>>>>
>>>> -Matthias
>>>>
>>>> On 07/13/2016 10:53 PM, Srikanth wrote:
>>>>> Hello,
>>>>>
>>>>> I'm trying the following join using KTable. There are two change log
>>>> tables.
>>>>> Table1
>>>>>   111 -> aaa
>>>>>   222 -> bbb
>>>>>   333 -> aaa
>>>>>
>>>>> Table2
>>>>>   aaa -> 999
>>>>>   bbb -> 888
>>>>>   ccc -> 777
>>>>>
>>>>> My result table should be
>>>>>   111 -> 999
>>>>>   222 -> 888
>>>>>   333 -> 999
>>>>>
>>>>> Its not a case for join() as the keys don't match. Its more a lookup
>>>> table.
>>>>>
>>>>> Option1 is to use a Table1.toStream().process(ProcessSupplier(),
>>>>> "storeName")
>>>>> punctuate() will use regular kafka consumer that reads updates from
>>>> Table2
>>>>> and updates a private map.
>>>>> Process() will do a key-value lookup.
>>>>> This has an advantage when Table1 is much larger than Table2.
>>>>> Each instance of the processor will have to hold entire Table2.
>>>>>
>>>>> Option2 is to re-partition Table1 using through(StreamPartitioner) and
>>>>> partition using value.
>>>>> This will ensure co-location. Then join with Table2. This part might be
>>>>> tricky??
>>>>>
>>>>> Your comments and suggestions are welcome!
>>>>>
>>>>> Srikanth
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to