Hi Jon, If I understand your question correctly: - any new KTables created by the DSL will automatically get the right policy. You don't need to do anything special. - otherwise you'll have to set the policy on the Kafka topic.
Eno > On 13 Feb 2017, at 11:16, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > > If Im doing a KStream.leftJoin(KTable) how would I set this configuration > for just the KTable portion? > > IE I have > > KStream = KStreamBuilder.stream() > KTable = KStreamBuilder.table() > > ... > (join occurs.. data flows.. ppl are brought closer together.. there is > peace in the valley.. for me... ) > ... > > KafkaStreams = new KafkaStream(KStreamBuilder, > config_with_cleanup_policy_or_not?) > KafkaStream.start > > On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Yeah makes sense. I was looking at it from the point of view of keeping >> all data forever. >> >> Eno >> >>> On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> Yes, that could happen if a key was not updated for a longer period than >>> topic retention time. >>> >>> If you want to force a changelog creation, you can do a dummy aggregate >>> instead of using KStreamBuilder#table() >>> >>> >>>> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new >> Reducer() { >>>> @Override >>>> public Object apply(Object oldValue, Object newValue) { >>>> return newValue; >>>> } >>>> }, "someStoreName"); >>> >>> >>> -Matthias >>> >>> >>> On 2/8/17 11:39 AM, Mathieu Fenniak wrote: >>>> I think there could be correctness implications... the default >>>> cleanup.policy of delete would mean that topic entries past the >> retention >>>> policy might have been removed. If you scale up the application, new >>>> application instances won't be able to restore a complete table into its >>>> local state store. An operation like a join against that KTable would >> find >>>> no records where there should be record. >>>> >>>> Mathieu >>>> >>>> >>>> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com> >>>> wrote: >>>> >>>>> If you fail to set the policy to compact, there shouldn't be any >>>>> correctness implications, however your topics will grow larger than >>>>> necessary. >>>>> >>>>> Eno >>>>> >>>>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >>>>>> >>>>>> What are the ramifications of failing to do this? >>>>>> >>>>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax < >> matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Yes, that is correct. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote: >>>>>>>> Hey kafka users, >>>>>>>> >>>>>>>> Is it correct that a Kafka topic that is used for a KTable should be >>>>> set >>>>>>> to >>>>>>>> cleanup.policy=compact? >>>>>>>> >>>>>>>> I've never noticed until today that the KStreamBuilder#table() >>>>>>>> documentation says: "However, no internal changelog topic is created >>>>>>> since >>>>>>>> the original input topic can be used for recovery"... [1], which >> seems >>>>>>> like >>>>>>>> it is only true if the topic is configured for compaction. >> Otherwise >>>>> the >>>>>>>> original input topic won't necessarily contain the data necessary >> for >>>>>>>> recovery of the state store. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951 >>>>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/ >>>>>>> kstream/KStreamBuilder.java#L355 >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Mathieu >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >> >>