This didn’t fix my problem unfortunately.  Both time stamps are 0.

> On Jul 24, 2018, at 15:22, Craig Ching <craigch...@gmail.com> wrote:
> 
> Hey, thanks for that Dmitriy!  I'll have a look.
> 
>> On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov 
>> <dvsekhval...@gmail.com> wrote:
>> Not really associated with Sarama.
>> 
>> But your issues sounds pretty much same i faced some time ago and fixed,
>> here it is: https://github.com/Shopify/sarama/issues/885
>> 
>> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.
>> 
>> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <craigch...@gmail.com> wrote:
>> 
>> > Hi Dmitry,
>> >
>> > Are you associated with the Sarama project?  If so, understand that part of
>> > what I want is to learn about Sarama and the Kafka message format ;)
>> >
>> > The problem I'm having is that if I turn on:
>> >
>> > log.message.timestamp.type=LogAppendTime
>> >
>> > in the broker, then produce on topic1 with console producer, I will see
>> > timestamps in the sarama client.  If I produce on topic2 with telegraf
>> > (incidentally, I think telegraf is a sarama producer), then I don't see
>> > timestamps in the sarama client.  In both cases, if I consume using the
>> > console consumer (with --property print.timestamp=true) I *do* see
>> > timestamps.
>> >
>> > I'm happy to debug this issue myself and submit a PR to sarama, but I am
>> > missing some fundamentals of how to decode the kafka message format and
>> > would really like some pointers.
>> >
>> > Cheers,
>> > Craig
>> >
>> > P.S.  Here is the sarama code I'm using to test:
>> >
>> > package main
>> >
>> > import (
>> > "fmt"
>> > "log"
>> > "os"
>> > "os/signal"
>> > "time"
>> >
>> > "github.com/Shopify/sarama"
>> > )
>> >
>> > func main() {
>> >
>> > // Initialize Sarama logging
>> > sarama.Logger = log.New(os.Stdout, "[Sarama] ",
>> > log.Ldate|log.Lmicroseconds|log.Lshortfile)
>> >
>> > signals := make(chan os.Signal, 1)
>> > signal.Notify(signals, os.Interrupt)
>> >
>> > config := sarama.NewConfig()
>> > config.Consumer.Return.Errors = true
>> > config.ClientID = "consumer-test"
>> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
>> > config.Metadata.Full = true
>> > // config.Version = sarama.V0_10_0_0
>> > config.Version = sarama.V1_1_0_0
>> > // config.Version = sarama.V0_10_2_1
>> > config.Consumer.Offsets.Initial = sarama.OffsetOldest
>> >
>> > brokers := []string{"localhost:9092"}
>> > // brokers :=
>> >
>> > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
>> >
>> > client, err := sarama.NewConsumer(brokers, config)
>> > if err != nil {
>> > panic(err)
>> > }
>> >
>> > // topic := "topic1"
>> > topic := "topic2"
>> > // topic := "metric-influx-measurement"
>> > // How to decide partition, is it fixed value...?
>> > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
>> > if err != nil {
>> > panic(err)
>> > }
>> >
>> > defer func() {
>> > if err := client.Close(); err != nil {
>> > panic(err)
>> > }
>> > }()
>> >
>> > // Count how many message processed
>> > msgCount := 0
>> >
>> > go func() {
>> > for {
>> > select {
>> > case err := <-consumer.Errors():
>> > fmt.Println(err)
>> > case msg := <-consumer.Messages():
>> > msgCount++
>> > fmt.Println(msg.Timestamp)
>> > fmt.Println("Received messages", string(msg.Key), string(msg.Value))
>> > case <-signals:
>> > fmt.Println("Interrupt is detected")
>> > break
>> > }
>> > }
>> > }()
>> > <-signals
>> > }
>> >
>> >
>> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
>> > dvsekhval...@gmail.com>
>> > wrote:
>> >
>> > > Hey Craig,
>> > >
>> > > what exact problem you have with Sarama client?
>> > >
>> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi!
>> > > >
>> > > > I'm working on debugging a problem with how message timestamps are
>> > > handled
>> > > > in the sarama client.  In some cases, the sarama client won't
>> > associate a
>> > > > timestamp with a message while the kafka console consumer does.  I've
>> > > found
>> > > > the documentation on the message format here:
>> > > >
>> > > > https://kafka.apache.org/documentation/#messageformat
>> > > >
>> > > > But the information there is very sparse.  For instance, what are
>> > > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
>> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
>> > > > timestamp I want.  Is there some state about the message that I need to
>> > > > understand in order to have maxTimestamp be used?  Any further
>> > > > documentation or guidance on this would be very helpful!
>> > > >
>> > > > On another note, I am trying to debug this through the scala/java
>> > console
>> > > > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
>> > > > anything special or documentation I need to set this up for debugging?
>> > > >
>> > >
>> >

Reply via email to