It might also help to understand how to get the "last value" of a
topic. I also want to mention, that "last value" is a moving point in
general, as new data might be appended at any time. Thus, the
definition of "last value" is not completely sounds.

For my code snipped below, I assume you do have a topic with no
running producers and thus no new data gets appended. For this case,
you need to seek() to "end minus one" offset and afterwards poll() for
the record.

The simplest way might be to use something like this

> Map<TopicPartition, Long> headOffsets =
> consumer.endOffsets(Collection<TopicPartition> partitions) 
> for(Entry<TopicPartition, Long> topicPlusOffset : headOffsets) { 
> consumer.seek(topicPlusOffset.getKey(), topicPlusOffset.getValue()
> - 1); }

Afterwards you can just call poll() and it should return the last
message for each partition (depending on you number of partitions, not
for all partitions in a single call to poll() though)

Hope this helps.

- -Matthias

On 11/3/16 5:26 AM, Furkan KAMACI wrote:
> I've just realised the parameter of poll method. It's been
> explained as:
> "The time, in milliseconds, spent waiting in poll if data is not
> available in the buffer."
> When I set to a big number ''sometimes" I can see a result in it.
> When I set it to 0 and push something to do topic that it listens
> still "sometimes" I can see a result.
> What I want is to get the last value of that topic?
> Kind Regards, Furkan KAMACI
> On Thu, Nov 3, 2016 at 1:36 PM, Furkan KAMACI
> <furkankam...@gmail.com> wrote:
>> Hi Matthias,
>> Thanks for the response. I stream output as follows:
>> longCounts.toStream((wk, v) -> wk.key()) .to(Serdes.String(), 
>> Serdes.Long(), "qps-aggregated");
>> I want to read last value from that topic at another application.
>> I've tried that:
>> Properties props = new Properties(); 
>> props.put("bootstrap.servers", "localhost:9092"); 
>> props.put("group.id", "qps-consumer"); *//I'dont know the real 
>> purpose of this setting* props.put("enable.auto.commit",
>> "true"); props.put("auto.commit.interval.ms", "1000"); 
>> props.put("session.timeout.ms", "30000"); 
>> props.put("key.deserializer", "org.apache.kafka.common. 
>> serialization.StringDeserializer"); 
>> props.put("value.deserializer", "org.apache.kafka.common. 
>> serialization.LongDeserializer"); KafkaConsumer<String, String>
>> consumer = new KafkaConsumer<>(props); 
>> consumer.subscribe(Collections.singletonList("qps-aggregated")); 
>> ConsumerRecords<String, String> records = consumer.poll(1); for
>> (ConsumerRecord<String, String> record : records) { 
>> System.out.printf("Connected! offset = %d, key = %s, value = %s",
>> record.offset(), record.key(), record.value()); }
>> I can see that there is data when I check the streamed topic 
>> (qps-aggregated) from command line. However, I cannot get any
>> result from that subscription via my application. What can be the
>> reason?
>> Kind Regards, Furkan KAMACI
>> On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax
>> <matth...@confluent.io> wrote:
> Hi,
> first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up
> you application for the first time. If you start it a second time,
> it will resume from where is left off.
> About getting numbers starting from zero: this is expected
> behavior because streams **updates** the window computation each
> time an input record is added to the window. So you see each
> intermediate result.
> Furthermore, each time a new window is created, you will see a "1" 
> again in the output as this is the current count of the new window.
> If you want do distinguish windows in the output, you need to look
> at the key. It encode the original record-key as well as a window
> ID.
> -Matthias
> On 11/2/16 12:13 PM, Furkan KAMACI wrote:
>>>>> I use Kafka I count the messages of a topic as
>>>>> follows:
>>>>> ... 
>>>>> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"); ... KStream<String, String> longs =
>>>>> builder.stream(Serdes.String(), Serdes.String(),
>>>>> "qps-input"); ... KTable<Windowed<String>, Long> longCounts
>>>>> = longs.countByKey(TimeWindows.of("qps", 3600 * 1000), 
>>>>> Serdes.String()); ...
>>>>> and then I write output to another topic. Result is that:
>>>>> Numbers which starts from 1 and increase whenever I add
>>>>> something to qps-input.
>>>>> My questions:
>>>>> 1) Does it calculate really last hour or everything from
>>>>> the beginning due you I've set it as earliest?
>>>>> 2) Sometimes it's been reset and numbers starts from 1.
>>>>> What can be the reason for that?
>>>>> Kind Regards, Furkan KAMACI
