-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 First a hint about "group.id". Please read this to make sense of this parameter:
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups - -and-offset-management It might also help to understand how to get the "last value" of a topic. I also want to mention, that "last value" is a moving point in general, as new data might be appended at any time. Thus, the definition of "last value" is not completely sounds. For my code snipped below, I assume you do have a topic with no running producers and thus no new data gets appended. For this case, you need to seek() to "end minus one" offset and afterwards poll() for the record. The simplest way might be to use something like this > Map<TopicPartition, Long> headOffsets = > consumer.endOffsets(Collection<TopicPartition> partitions) > for(Entry<TopicPartition, Long> topicPlusOffset : headOffsets) { > consumer.seek(topicPlusOffset.getKey(), topicPlusOffset.getValue() > - 1); } Afterwards you can just call poll() and it should return the last message for each partition (depending on you number of partitions, not for all partitions in a single call to poll() though) Hope this helps. - -Matthias On 11/3/16 5:26 AM, Furkan KAMACI wrote: > I've just realised the parameter of poll method. It's been > explained as: > > "The time, in milliseconds, spent waiting in poll if data is not > available in the buffer." > > When I set to a big number ''sometimes" I can see a result in it. > When I set it to 0 and push something to do topic that it listens > still "sometimes" I can see a result. > > What I want is to get the last value of that topic? > > Kind Regards, Furkan KAMACI > > On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI > <furkankam...@gmail.com> wrote: > >> Hi Matthias, >> >> Thanks for the response. I stream output as follows: >> >> longCounts.toStream((wk, v) -> wk.key()) .to(Serdes.String(), >> Serdes.Long(), "qps-aggregated"); >> >> I want to read last value from that topic at another application. >> I've tried that: >> >> Properties props = new Properties(); >> props.put("bootstrap.servers", "localhost:9092"); >> props.put("group.id", "qps-consumer"); *//I'dont know the real >> purpose of this setting* props.put("enable.auto.commit", >> "true"); props.put("auto.commit.interval.ms", "1000"); >> props.put("session.timeout.ms", "30000"); >> props.put("key.deserializer", "org.apache.kafka.common. >> serialization.StringDeserializer"); >> props.put("value.deserializer", "org.apache.kafka.common. >> serialization.LongDeserializer"); KafkaConsumer<String, String> >> consumer = new KafkaConsumer<>(props); >> consumer.subscribe(Collections.singletonList("qps-aggregated")); >> ConsumerRecords<String, String> records = consumer.poll(1); for >> (ConsumerRecord<String, String> record : records) { >> System.out.printf("Connected! offset = %d, key = %s, value = %s", >> record.offset(), record.key(), record.value()); } >> >> I can see that there is data when I check the streamed topic >> (qps-aggregated) from command line. However, I cannot get any >> result from that subscription via my application. What can be the >> reason? >> >> Kind Regards, Furkan KAMACI >> >> On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax >> <matth...@confluent.io> wrote: >> > Hi, > > first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up > you application for the first time. If you start it a second time, > it will resume from where is left off. > > About getting numbers starting from zero: this is expected > behavior because streams **updates** the window computation each > time an input record is added to the window. So you see each > intermediate result. > > Furthermore, each time a new window is created, you will see a "1" > again in the output as this is the current count of the new window. > If you want do distinguish windows in the output, you need to look > at the key. It encode the original record-key as well as a window > ID. > > > -Matthias > > On 11/2/16 12:13 PM, Furkan KAMACI wrote: >>>>> I use Kafka 0.10.0.1. I count the messages of a topic as >>>>> follows: >>>>> >>>>> ... >>>>> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>>>> >>>>> "earliest"); ... KStream<String, String> longs = >>>>> builder.stream(Serdes.String(), Serdes.String(), >>>>> "qps-input"); ... KTable<Windowed<String>, Long> longCounts >>>>> = longs.countByKey(TimeWindows.of("qps", 3600 * 1000), >>>>> Serdes.String()); ... >>>>> >>>>> and then I write output to another topic. Result is that: >>>>> >>>>> Numbers which starts from 1 and increase whenever I add >>>>> something to qps-input. >>>>> >>>>> My questions: >>>>> >>>>> 1) Does it calculate really last hour or everything from >>>>> the beginning due you I've set it as earliest? >>>>> >>>>> 2) Sometimes it's been reset and numbers starts from 1. >>>>> What can be the reason for that? >>>>> >>>>> Kind Regards, Furkan KAMACI >>>>> >>> >> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYG4NTAAoJECnhiMLycopPGNgP/iF95vXFLvzVD585e+8ny7VE ykhNIGXQ725yY6bp++bK3WJoyhCt5GXKWULoJc+Zvs+3QJUPcG0zfuIvWUr2pZwf m+1y0wVUK8tCVooIa/Bv2Hhrw3HhEt88518Puvl444zQmBQF+K3YTqHlxjmUMvem NYXv7CMqkRngmWxdgbjUr+WY0ISwCOZRQxB8NDnmnXbxgXucVpBcqDYfcNIrMUJJ UalVa1+JKvy10FpMMXjcZuatJ+YdE7ueKeAmIXV/W50ICuCdEj/WouvXPnnOUfcr mhydq8H5FO566+pf6v80+kn0sJDb9gedUcNBKS89TLZH/IRQjo/u4Go8itsfSFI1 ykVMe6YgLNKuNTW4qqG05TBivV+Mgieyt+0FGLhF60zS9wJjCAoo1o+eSxIn/b1N ruLpDkIyFKWsO3NhdRxEn6YVoYuo4cc6trwAsxpGMFH92IWc1fY5hGIobm409IIG 0IqTQ3OvmapgDvZh0S90XHC8zzj1nsLtznmJlozUdPBAO0g3N1Fn5BQiIzbrV096 kKn8vT0r3M7izi/gP7Y7ylV6w3AK7SL+O7Ryy3H5tGNtLJ3xgns3vqnRaMc76MrN 9kyR2BDF0stFPPX4WSXmLbveq1kCW68ul9humhNJHZLcO9HqGUrXIh95HFTmW+On 040594zj4yDH/CRiIPbj =lJDC -----END PGP SIGNATURE-----