I've just realised the parameter of poll method. It's been explained as: "The time, in milliseconds, spent waiting in poll if data is not available in the buffer."
When I set to a big number ''sometimes" I can see a result in it. When I set it to 0 and push something to do topic that it listens still "sometimes" I can see a result. What I want is to get the last value of that topic? Kind Regards, Furkan KAMACI On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI <furkankam...@gmail.com> wrote: > Hi Matthias, > > Thanks for the response. I stream output as follows: > > longCounts.toStream((wk, v) -> wk.key()) > .to(Serdes.String(), > Serdes.Long(), > "qps-aggregated"); > > I want to read last value from that topic at another application. I've > tried that: > > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "qps-consumer"); *//I'dont know the real > purpose of this setting* > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("session.timeout.ms", "30000"); > props.put("key.deserializer", "org.apache.kafka.common. > serialization.StringDeserializer"); > props.put("value.deserializer", "org.apache.kafka.common. > serialization.LongDeserializer"); > KafkaConsumer<String, String> consumer = new > KafkaConsumer<>(props); > consumer.subscribe(Collections.singletonList("qps-aggregated")); > ConsumerRecords<String, String> records = consumer.poll(1); > for (ConsumerRecord<String, String> record : records) { > System.out.printf("Connected! offset = %d, key = %s, value = > %s", record.offset(), record.key(), record.value()); > } > > I can see that there is data when I check the streamed topic > (qps-aggregated) from command line. However, I cannot get any result from > that subscription via my application. What can be the reason? > > Kind Regards, > Furkan KAMACI > > On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> -----BEGIN PGP SIGNED MESSAGE----- >> Hash: SHA512 >> >> Hi, >> >> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you >> application for the first time. If you start it a second time, it will >> resume from where is left off. >> >> About getting numbers starting from zero: this is expected behavior >> because streams **updates** the window computation each time an input >> record is added to the window. So you see each intermediate result. >> >> Furthermore, each time a new window is created, you will see a "1" >> again in the output as this is the current count of the new window. If >> you want do distinguish windows in the output, you need to look at the >> key. It encode the original record-key as well as a window ID. >> >> >> - -Matthias >> >> On 11/2/16 12:13 PM, Furkan KAMACI wrote: >> > I use Kafka 0.10.0.1. I count the messages of a topic as follows: >> > >> > ... >> > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> > "earliest"); ... KStream<String, String> longs = >> > builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ... >> > KTable<Windowed<String>, Long> longCounts = >> > longs.countByKey(TimeWindows.of("qps", 3600 * 1000), >> > Serdes.String()); ... >> > >> > and then I write output to another topic. Result is that: >> > >> > Numbers which starts from 1 and increase whenever I add something >> > to qps-input. >> > >> > My questions: >> > >> > 1) Does it calculate really last hour or everything from the >> > beginning due you I've set it as earliest? >> > >> > 2) Sometimes it's been reset and numbers starts from 1. What can be >> > the reason for that? >> > >> > Kind Regards, Furkan KAMACI >> > >> -----BEGIN PGP SIGNATURE----- >> Comment: GPGTools - https://gpgtools.org >> >> iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz >> N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT >> RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI >> Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6 >> vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+ >> APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8 >> sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc >> bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv >> XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J >> 8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8 >> BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1 >> 6gi7YNmGkeE+jzTf/YC9 >> =Vq3G >> -----END PGP SIGNATURE----- >> > >