-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

First a hint about "group.id". Please read this to make sense of this
parameter:

http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups
- -and-offset-management

It might also help to understand how to get the "last value" of a
topic. I also want to mention, that "last value" is a moving point in
general, as new data might be appended at any time. Thus, the
definition of "last value" is not completely sounds.

For my code snipped below, I assume you do have a topic with no
running producers and thus no new data gets appended. For this case,
you need to seek() to "end minus one" offset and afterwards poll() for
the record.

The simplest way might be to use something like this

> Map<TopicPartition, Long> headOffsets =
> consumer.endOffsets(Collection<TopicPartition> partitions) 
> for(Entry<TopicPartition, Long> topicPlusOffset : headOffsets) { 
> consumer.seek(topicPlusOffset.getKey(), topicPlusOffset.getValue()
> - 1); }

Afterwards you can just call poll() and it should return the last
message for each partition (depending on you number of partitions, not
for all partitions in a single call to poll() though)


Hope this helps.

- -Matthias


On 11/3/16 5:26 AM, Furkan KAMACI wrote:
> I've just realised the parameter of poll method. It's been
> explained as:
> 
> "The time, in milliseconds, spent waiting in poll if data is not
> available in the buffer."
> 
> When I set to a big number ''sometimes" I can see a result in it.
> When I set it to 0 and push something to do topic that it listens
> still "sometimes" I can see a result.
> 
> What I want is to get the last value of that topic?
> 
> Kind Regards, Furkan KAMACI
> 
> On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI
> <furkankam...@gmail.com> wrote:
> 
>> Hi Matthias,
>> 
>> Thanks for the response. I stream output as follows:
>> 
>> longCounts.toStream((wk, v) -> wk.key()) .to(Serdes.String(), 
>> Serdes.Long(), "qps-aggregated");
>> 
>> I want to read last value from that topic at another application.
>> I've tried that:
>> 
>> Properties props = new Properties(); 
>> props.put("bootstrap.servers", "localhost:9092"); 
>> props.put("group.id", "qps-consumer"); *//I'dont know the real 
>> purpose of this setting* props.put("enable.auto.commit",
>> "true"); props.put("auto.commit.interval.ms", "1000"); 
>> props.put("session.timeout.ms", "30000"); 
>> props.put("key.deserializer", "org.apache.kafka.common. 
>> serialization.StringDeserializer"); 
>> props.put("value.deserializer", "org.apache.kafka.common. 
>> serialization.LongDeserializer"); KafkaConsumer<String, String>
>> consumer = new KafkaConsumer<>(props); 
>> consumer.subscribe(Collections.singletonList("qps-aggregated")); 
>> ConsumerRecords<String, String> records = consumer.poll(1); for
>> (ConsumerRecord<String, String> record : records) { 
>> System.out.printf("Connected! offset = %d, key = %s, value = %s",
>> record.offset(), record.key(), record.value()); }
>> 
>> I can see that there is data when I check the streamed topic 
>> (qps-aggregated) from command line. However, I cannot get any
>> result from that subscription via my application. What can be the
>> reason?
>> 
>> Kind Regards, Furkan KAMACI
>> 
>> On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax
>> <matth...@confluent.io> wrote:
>> 
> Hi,
> 
> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up
> you application for the first time. If you start it a second time,
> it will resume from where is left off.
> 
> About getting numbers starting from zero: this is expected
> behavior because streams **updates** the window computation each
> time an input record is added to the window. So you see each
> intermediate result.
> 
> Furthermore, each time a new window is created, you will see a "1" 
> again in the output as this is the current count of the new window.
> If you want do distinguish windows in the output, you need to look
> at the key. It encode the original record-key as well as a window
> ID.
> 
> 
> -Matthias
> 
> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
>>>>> I use Kafka 0.10.0.1. I count the messages of a topic as
>>>>> follows:
>>>>> 
>>>>> ... 
>>>>> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>>>>
>>>>> 
"earliest"); ... KStream<String, String> longs =
>>>>> builder.stream(Serdes.String(), Serdes.String(),
>>>>> "qps-input"); ... KTable<Windowed<String>, Long> longCounts
>>>>> = longs.countByKey(TimeWindows.of("qps", 3600 * 1000), 
>>>>> Serdes.String()); ...
>>>>> 
>>>>> and then I write output to another topic. Result is that:
>>>>> 
>>>>> Numbers which starts from 1 and increase whenever I add
>>>>> something to qps-input.
>>>>> 
>>>>> My questions:
>>>>> 
>>>>> 1) Does it calculate really last hour or everything from
>>>>> the beginning due you I've set it as earliest?
>>>>> 
>>>>> 2) Sometimes it's been reset and numbers starts from 1.
>>>>> What can be the reason for that?
>>>>> 
>>>>> Kind Regards, Furkan KAMACI
>>>>> 
>>> 
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYG4NTAAoJECnhiMLycopPGNgP/iF95vXFLvzVD585e+8ny7VE
ykhNIGXQ725yY6bp++bK3WJoyhCt5GXKWULoJc+Zvs+3QJUPcG0zfuIvWUr2pZwf
m+1y0wVUK8tCVooIa/Bv2Hhrw3HhEt88518Puvl444zQmBQF+K3YTqHlxjmUMvem
NYXv7CMqkRngmWxdgbjUr+WY0ISwCOZRQxB8NDnmnXbxgXucVpBcqDYfcNIrMUJJ
UalVa1+JKvy10FpMMXjcZuatJ+YdE7ueKeAmIXV/W50ICuCdEj/WouvXPnnOUfcr
mhydq8H5FO566+pf6v80+kn0sJDb9gedUcNBKS89TLZH/IRQjo/u4Go8itsfSFI1
ykVMe6YgLNKuNTW4qqG05TBivV+Mgieyt+0FGLhF60zS9wJjCAoo1o+eSxIn/b1N
ruLpDkIyFKWsO3NhdRxEn6YVoYuo4cc6trwAsxpGMFH92IWc1fY5hGIobm409IIG
0IqTQ3OvmapgDvZh0S90XHC8zzj1nsLtznmJlozUdPBAO0g3N1Fn5BQiIzbrV096
kKn8vT0r3M7izi/gP7Y7ylV6w3AK7SL+O7Ryy3H5tGNtLJ3xgns3vqnRaMc76MrN
9kyR2BDF0stFPPX4WSXmLbveq1kCW68ul9humhNJHZLcO9HqGUrXIh95HFTmW+On
040594zj4yDH/CRiIPbj
=lJDC
-----END PGP SIGNATURE-----

Reply via email to