I've just realised the parameter of poll method. It's been explained as:

"The time, in milliseconds, spent waiting in poll if data is not available
in the buffer."

When I set to a big number ''sometimes" I can see a result in it. When I
set it to 0 and push something to do topic that it listens still
"sometimes" I can see a result.

What I want is to get the last value of that topic?

Kind Regards,
Furkan KAMACI

On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI <furkankam...@gmail.com>
wrote:

> Hi Matthias,
>
> Thanks for the response. I stream output as follows:
>
>         longCounts.toStream((wk, v) -> wk.key())
>                 .to(Serdes.String(),
>                         Serdes.Long(),
>                         "qps-aggregated");
>
> I want to read last value from that topic at another application. I've
> tried that:
>
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092");
>         props.put("group.id", "qps-consumer"); *//I'dont know the real
> purpose of this setting*
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("session.timeout.ms", "30000");
>         props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>         props.put("value.deserializer", "org.apache.kafka.common.
> serialization.LongDeserializer");
>         KafkaConsumer<String, String> consumer = new
> KafkaConsumer<>(props);
>         consumer.subscribe(Collections.singletonList("qps-aggregated"));
>         ConsumerRecords<String, String> records = consumer.poll(1);
>         for (ConsumerRecord<String, String> record : records) {
>             System.out.printf("Connected! offset = %d, key = %s, value =
> %s", record.offset(), record.key(), record.value());
>         }
>
> I can see that there is data when I check the streamed topic
> (qps-aggregated) from command line. However, I cannot get any result from
> that subscription via my application. What can be the reason?
>
> Kind Regards,
> Furkan KAMACI
>
> On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA512
>>
>> Hi,
>>
>> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you
>> application for the first time. If you start it a second time, it will
>> resume from where is left off.
>>
>> About getting numbers starting from zero: this is expected behavior
>> because streams **updates** the window computation each time an input
>> record is added to the window. So you see each intermediate result.
>>
>> Furthermore, each time a new window is created, you will see a "1"
>> again in the output as this is the current count of the new window. If
>> you want do distinguish windows in the output, you need to look at the
>> key. It encode the original record-key as well as a window ID.
>>
>>
>> - -Matthias
>>
>> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
>> > I use Kafka 0.10.0.1. I count the messages of a topic as follows:
>> >
>> > ...
>> > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> > "earliest"); ... KStream<String, String> longs =
>> > builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ...
>> > KTable<Windowed<String>, Long> longCounts =
>> > longs.countByKey(TimeWindows.of("qps", 3600 * 1000),
>> > Serdes.String()); ...
>> >
>> > and then I write output to another topic. Result is that:
>> >
>> > Numbers which starts from 1 and increase whenever I add something
>> > to qps-input.
>> >
>> > My questions:
>> >
>> > 1) Does it calculate really last hour or everything from the
>> > beginning due you I've set it as earliest?
>> >
>> > 2) Sometimes it's been reset and numbers starts from 1. What can be
>> > the reason for that?
>> >
>> > Kind Regards, Furkan KAMACI
>> >
>> -----BEGIN PGP SIGNATURE-----
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz
>> N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT
>> RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI
>> Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6
>> vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+
>> APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8
>> sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc
>> bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv
>> XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J
>> 8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8
>> BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1
>> 6gi7YNmGkeE+jzTf/YC9
>> =Vq3G
>> -----END PGP SIGNATURE-----
>>
>
>

Reply via email to