Not really associated with Sarama. But your issues sounds pretty much same i faced some time ago and fixed, here it is: https://github.com/Shopify/sarama/issues/885
Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps. On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <craigch...@gmail.com> wrote: > Hi Dmitry, > > Are you associated with the Sarama project? If so, understand that part of > what I want is to learn about Sarama and the Kafka message format ;) > > The problem I'm having is that if I turn on: > > log.message.timestamp.type=LogAppendTime > > in the broker, then produce on topic1 with console producer, I will see > timestamps in the sarama client. If I produce on topic2 with telegraf > (incidentally, I think telegraf is a sarama producer), then I don't see > timestamps in the sarama client. In both cases, if I consume using the > console consumer (with --property print.timestamp=true) I *do* see > timestamps. > > I'm happy to debug this issue myself and submit a PR to sarama, but I am > missing some fundamentals of how to decode the kafka message format and > would really like some pointers. > > Cheers, > Craig > > P.S. Here is the sarama code I'm using to test: > > package main > > import ( > "fmt" > "log" > "os" > "os/signal" > "time" > > "github.com/Shopify/sarama" > ) > > func main() { > > // Initialize Sarama logging > sarama.Logger = log.New(os.Stdout, "[Sarama] ", > log.Ldate|log.Lmicroseconds|log.Lshortfile) > > signals := make(chan os.Signal, 1) > signal.Notify(signals, os.Interrupt) > > config := sarama.NewConfig() > config.Consumer.Return.Errors = true > config.ClientID = "consumer-test" > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute > config.Metadata.Full = true > // config.Version = sarama.V0_10_0_0 > config.Version = sarama.V1_1_0_0 > // config.Version = sarama.V0_10_2_1 > config.Consumer.Offsets.Initial = sarama.OffsetOldest > > brokers := []string{"localhost:9092"} > // brokers := > > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"} > > client, err := sarama.NewConsumer(brokers, config) > if err != nil { > panic(err) > } > > // topic := "topic1" > topic := "topic2" > // topic := "metric-influx-measurement" > // How to decide partition, is it fixed value...? > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest) > if err != nil { > panic(err) > } > > defer func() { > if err := client.Close(); err != nil { > panic(err) > } > }() > > // Count how many message processed > msgCount := 0 > > go func() { > for { > select { > case err := <-consumer.Errors(): > fmt.Println(err) > case msg := <-consumer.Messages(): > msgCount++ > fmt.Println(msg.Timestamp) > fmt.Println("Received messages", string(msg.Key), string(msg.Value)) > case <-signals: > fmt.Println("Interrupt is detected") > break > } > } > }() > <-signals > } > > > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> > wrote: > > > Hey Craig, > > > > what exact problem you have with Sarama client? > > > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com> > wrote: > > > > > Hi! > > > > > > I'm working on debugging a problem with how message timestamps are > > handled > > > in the sarama client. In some cases, the sarama client won't > associate a > > > timestamp with a message while the kafka console consumer does. I've > > found > > > the documentation on the message format here: > > > > > > https://kafka.apache.org/documentation/#messageformat > > > > > > But the information there is very sparse. For instance, what are > > > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm debugging > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the > > > timestamp I want. Is there some state about the message that I need to > > > understand in order to have maxTimestamp be used? Any further > > > documentation or guidance on this would be very helpful! > > > > > > On another note, I am trying to debug this through the scala/java > console > > > consumer, but I'm having a hard time getting IntelliJ setup. Is there > > > anything special or documentation I need to set this up for debugging? > > > > > >