Yeah makes sense. I was looking at it from the point of view of keeping all 
data forever.

Eno

> On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> Yes, that could happen if a key was not updated for a longer period than
> topic retention time.
> 
> If you want to force a changelog creation, you can do a dummy aggregate
> instead of using KStreamBuilder#table()
> 
> 
>> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new 
>> Reducer() {
>>    @Override
>>    public Object apply(Object oldValue, Object newValue) {
>>        return newValue;
>>    }
>> }, "someStoreName");
> 
> 
> -Matthias
> 
> 
> On 2/8/17 11:39 AM, Mathieu Fenniak wrote:
>> I think there could be correctness implications... the default
>> cleanup.policy of delete would mean that topic entries past the retention
>> policy might have been removed.  If you scale up the application, new
>> application instances won't be able to restore a complete table into its
>> local state store.  An operation like a join against that KTable would find
>> no records where there should be record.
>> 
>> Mathieu
>> 
>> 
>> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>> 
>>> If you fail to set the policy to compact, there shouldn't be any
>>> correctness implications, however your topics will grow larger than
>>> necessary.
>>> 
>>> Eno
>>> 
>>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> wrote:
>>>> 
>>>> What are the ramifications of failing to do this?
>>>> 
>>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>> 
>>>>> Yes, that is correct.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
>>>>>> Hey kafka users,
>>>>>> 
>>>>>> Is it correct that a Kafka topic that is used for a KTable should be
>>> set
>>>>> to
>>>>>> cleanup.policy=compact?
>>>>>> 
>>>>>> I've never noticed until today that the KStreamBuilder#table()
>>>>>> documentation says: "However, no internal changelog topic is created
>>>>> since
>>>>>> the original input topic can be used for recovery"... [1], which seems
>>>>> like
>>>>>> it is only true if the topic is configured for compaction.  Otherwise
>>> the
>>>>>> original input topic won't necessarily contain the data necessary for
>>>>>> recovery of the state store.
>>>>>> 
>>>>>> [1]
>>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
>>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
>>>>> kstream/KStreamBuilder.java#L355
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Mathieu
>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>> 
> 

Reply via email to