My apologies, the append log behavior was due to the repartion logs not
being cleaned up. Still the log compation has none influence on the
aggregated records in the procedure mentioned above. Are there any other
tricks one could use? Exactly once does not seem to have effect in this
particular case.

Thank you by the way.



On Thu, Dec 14, 2017 at 7:47 PM, Artur Mrozowski <art...@gmail.com> wrote:

> Hi Gouzhang,
> thank you for the answer. Indeed the value is being populated now, however
> the application behaves oddly and not how it used to. I suspect that
> disabling caching by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 has
> been persisited somehow.
>
> It seems as log compaction has been disabled permanently. What I observe
> now is rather log append. For each run the output will be more and more
> duplicates.
>
> Normally I would have hard time to reproduce duplication with that number
> of records, 3 in this case. I am trying to implement same idea as you
> described in KIP 150. Normally I would not observe duplicates until
> aggregation in line 495
>
> https://github.com/afuyo/KStreamsDemo/blob/master/src/
> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L425
>
> I could get rid of large number of duplicates using exactly once semantics
> but not anymore. I run on version 0.11 of Kafka Streams. What do you think
> could be causing it? Is version 1.0 more stable in this aspect?
> Best regards
> Artur
>
> On Thu, Dec 14, 2017 at 6:42 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Artur,
>>
>> This is because Properties#getProperty() is expecting a String value, and
>> hence 10 * 1024 * 1024L is not recognized; you can try "10485760".
>>
>>
>> Guozhang
>>
>> On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <art...@gmail.com>
>> wrote:
>>
>> > Sure.
>> >
>> > Another observation I've made is that before I started modifying these
>> > properties I could spot quite a few duplicates in the state store. Then
>> I
>> > applied exactly once semantics which removed most of the duplicates.
>> > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG
>> to 0
>> > which duplicates each record. Since then I've been trying to reenable
>> it.
>> >
>> >  StreamsConfig config = new StreamsConfig(getProperties());
>> >
>> >
>> > System.out.println(getProperties().getProperty(StreamsConfig
>> .PROCESSING_
>> > GUARANTEE_CONFIG));
>> >
>> > System.out.println(getProperties().getProperty(
>> > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
>> >
>> > System.out.println(getProperties().getProperty(StreamsConfig.STATE_DIR_
>> > CONFIG));
>> >
>> > exactly_once
>> > null
>> > /tmp/customerStoreLocal6
>> >
>> >
>> > private static Properties getProperties() {
>> >         Properties settings = new Properties();
>> >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
>> >         settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > "localhost:9092");
>> >         settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> > "localhost:2181");
>> >         settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass());
>> >         settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass());
>> >
>> > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerSt
>> oreLocal6");
>> >         settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>> > WallclockTimestampExtractor.class);
>> >
>> > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");
>> >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earli
>> est");
>> >         settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10
>> *
>> > 1024 * 1024L);
>> >         return settings;
>> >     }
>> >
>> > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> >
>> > > Could you show us the testing code snippet that shows how you set the
>> > > configs and how you read from it for verification?
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <art...@gmail.com>
>> > wrote:
>> > >
>> > > > Hello Guozhang,
>> > > >
>> > > > I print out some values that I assign to StreamsConfig in the
>> console,
>> > > but
>> > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled
>> > caching
>> > > by
>> > > > setting it to 0 today, and it seems to have the expected effect.
>> > > > But after this I am not able to assign any value to it, it is always
>> > nul.
>> > > >
>> > > > Best Regards
>> > > > Artur
>> > > >
>> > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hello Artur,
>> > > > >
>> > > > > What do you mean exactly by "It simply returns null no matter what
>> > > value
>> > > > I
>> > > > > provide."?
>> > > > >
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > >
>> > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski <
>> art...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi Bill,
>> > > > > > No, but I'll be happy to generate it. How do I generate logs for
>> > > > > > StreamsConfig?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Artur
>> > > > > >
>> > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck <b...@confluent.io
>> >
>> > > > wrote:
>> > > > > >
>> > > > > > > H Artur,
>> > > > > > >
>> > > > > > > Do you have any log files you can share for this issue?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Bill
>> > > > > > >
>> > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski <
>> > art...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Actually I can see all other properties being set, except
>> for
>> > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null.
>> > > > > > > > I use 0.11.0.2 Kafka Streams.
>> > > > > > > > Has anyone encountered this issue?
>> > > > > > > >
>> > > > > > > > /Artur
>> > > > > > > >
>> > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski <
>> > > art...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi,
>> > > > > > > > > I played around with caching on Confluent platform 3.3 by
>> > first
>> > > > > > > > disabling,
>> > > > > > > > > setting to zero. Now, it seems I can not enable it again.
>> It
>> > > > simply
>> > > > > > > > returns
>> > > > > > > > > null no matter what value I provide.
>> > > > > > > > >
>> > > > > > > > > e.g
>> > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_
>> > > > > > > > > CONFIG,10*1024*1024L);
>> > > > > > > > >
>> > > > > > > > > How can I enable it again? It generates a lot of
>> duplicates.
>> > > > > > > > >
>> > > > > > > > > Best Regards
>> > > > > > > > > Artur
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to