Hi Jon,

If I understand your question correctly:
- any new KTables created by the DSL will automatically get the right policy. 
You don't need to do anything special.
- otherwise you'll have to set the policy on the Kafka topic. 

Eno

> On 13 Feb 2017, at 11:16, Jon Yeargers <jon.yearg...@cedexis.com> wrote:
> 
> If Im doing a KStream.leftJoin(KTable) how would I set this configuration
> for just the KTable portion?
> 
> IE I have
> 
> KStream = KStreamBuilder.stream()
> KTable = KStreamBuilder.table()
> 
> ...
> (join occurs.. data flows.. ppl are brought closer together.. there is
> peace in the valley.. for me... )
> ...
> 
> KafkaStreams = new KafkaStream(KStreamBuilder,
> config_with_cleanup_policy_or_not?)
> KafkaStream.start
> 
> On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> Yeah makes sense. I was looking at it from the point of view of keeping
>> all data forever.
>> 
>> Eno
>> 
>>> On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote:
>>> 
>>> Yes, that could happen if a key was not updated for a longer period than
>>> topic retention time.
>>> 
>>> If you want to force a changelog creation, you can do a dummy aggregate
>>> instead of using KStreamBuilder#table()
>>> 
>>> 
>>>> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new
>> Reducer() {
>>>>   @Override
>>>>   public Object apply(Object oldValue, Object newValue) {
>>>>       return newValue;
>>>>   }
>>>> }, "someStoreName");
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 2/8/17 11:39 AM, Mathieu Fenniak wrote:
>>>> I think there could be correctness implications... the default
>>>> cleanup.policy of delete would mean that topic entries past the
>> retention
>>>> policy might have been removed.  If you scale up the application, new
>>>> application instances won't be able to restore a complete table into its
>>>> local state store.  An operation like a join against that KTable would
>> find
>>>> no records where there should be record.
>>>> 
>>>> Mathieu
>>>> 
>>>> 
>>>> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com>
>>>> wrote:
>>>> 
>>>>> If you fail to set the policy to compact, there shouldn't be any
>>>>> correctness implications, however your topics will grow larger than
>>>>> necessary.
>>>>> 
>>>>> Eno
>>>>> 
>>>>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>>>>>> 
>>>>>> What are the ramifications of failing to do this?
>>>>>> 
>>>>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <
>> matth...@confluent.io>
>>>>>> wrote:
>>>>>> 
>>>>>>> Yes, that is correct.
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> 
>>>>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
>>>>>>>> Hey kafka users,
>>>>>>>> 
>>>>>>>> Is it correct that a Kafka topic that is used for a KTable should be
>>>>> set
>>>>>>> to
>>>>>>>> cleanup.policy=compact?
>>>>>>>> 
>>>>>>>> I've never noticed until today that the KStreamBuilder#table()
>>>>>>>> documentation says: "However, no internal changelog topic is created
>>>>>>> since
>>>>>>>> the original input topic can be used for recovery"... [1], which
>> seems
>>>>>>> like
>>>>>>>> it is only true if the topic is configured for compaction.
>> Otherwise
>>>>> the
>>>>>>>> original input topic won't necessarily contain the data necessary
>> for
>>>>>>>> recovery of the state store.
>>>>>>>> 
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
>>>>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
>>>>>>> kstream/KStreamBuilder.java#L355
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Mathieu
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 

Reply via email to