This didn’t fix my problem unfortunately. Both time stamps are 0.
> On Jul 24, 2018, at 15:22, Craig Ching <craigch...@gmail.com> wrote: > > Hey, thanks for that Dmitriy! I'll have a look. > >> On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov >> <dvsekhval...@gmail.com> wrote: >> Not really associated with Sarama. >> >> But your issues sounds pretty much same i faced some time ago and fixed, >> here it is: https://github.com/Shopify/sarama/issues/885 >> >> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps. >> >> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <craigch...@gmail.com> wrote: >> >> > Hi Dmitry, >> > >> > Are you associated with the Sarama project? If so, understand that part of >> > what I want is to learn about Sarama and the Kafka message format ;) >> > >> > The problem I'm having is that if I turn on: >> > >> > log.message.timestamp.type=LogAppendTime >> > >> > in the broker, then produce on topic1 with console producer, I will see >> > timestamps in the sarama client. If I produce on topic2 with telegraf >> > (incidentally, I think telegraf is a sarama producer), then I don't see >> > timestamps in the sarama client. In both cases, if I consume using the >> > console consumer (with --property print.timestamp=true) I *do* see >> > timestamps. >> > >> > I'm happy to debug this issue myself and submit a PR to sarama, but I am >> > missing some fundamentals of how to decode the kafka message format and >> > would really like some pointers. >> > >> > Cheers, >> > Craig >> > >> > P.S. Here is the sarama code I'm using to test: >> > >> > package main >> > >> > import ( >> > "fmt" >> > "log" >> > "os" >> > "os/signal" >> > "time" >> > >> > "github.com/Shopify/sarama" >> > ) >> > >> > func main() { >> > >> > // Initialize Sarama logging >> > sarama.Logger = log.New(os.Stdout, "[Sarama] ", >> > log.Ldate|log.Lmicroseconds|log.Lshortfile) >> > >> > signals := make(chan os.Signal, 1) >> > signal.Notify(signals, os.Interrupt) >> > >> > config := sarama.NewConfig() >> > config.Consumer.Return.Errors = true >> > config.ClientID = "consumer-test" >> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute >> > config.Metadata.Full = true >> > // config.Version = sarama.V0_10_0_0 >> > config.Version = sarama.V1_1_0_0 >> > // config.Version = sarama.V0_10_2_1 >> > config.Consumer.Offsets.Initial = sarama.OffsetOldest >> > >> > brokers := []string{"localhost:9092"} >> > // brokers := >> > >> > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"} >> > >> > client, err := sarama.NewConsumer(brokers, config) >> > if err != nil { >> > panic(err) >> > } >> > >> > // topic := "topic1" >> > topic := "topic2" >> > // topic := "metric-influx-measurement" >> > // How to decide partition, is it fixed value...? >> > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest) >> > if err != nil { >> > panic(err) >> > } >> > >> > defer func() { >> > if err := client.Close(); err != nil { >> > panic(err) >> > } >> > }() >> > >> > // Count how many message processed >> > msgCount := 0 >> > >> > go func() { >> > for { >> > select { >> > case err := <-consumer.Errors(): >> > fmt.Println(err) >> > case msg := <-consumer.Messages(): >> > msgCount++ >> > fmt.Println(msg.Timestamp) >> > fmt.Println("Received messages", string(msg.Key), string(msg.Value)) >> > case <-signals: >> > fmt.Println("Interrupt is detected") >> > break >> > } >> > } >> > }() >> > <-signals >> > } >> > >> > >> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov < >> > dvsekhval...@gmail.com> >> > wrote: >> > >> > > Hey Craig, >> > > >> > > what exact problem you have with Sarama client? >> > > >> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com> >> > wrote: >> > > >> > > > Hi! >> > > > >> > > > I'm working on debugging a problem with how message timestamps are >> > > handled >> > > > in the sarama client. In some cases, the sarama client won't >> > associate a >> > > > timestamp with a message while the kafka console consumer does. I've >> > > found >> > > > the documentation on the message format here: >> > > > >> > > > https://kafka.apache.org/documentation/#messageformat >> > > > >> > > > But the information there is very sparse. For instance, what are >> > > > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm debugging >> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the >> > > > timestamp I want. Is there some state about the message that I need to >> > > > understand in order to have maxTimestamp be used? Any further >> > > > documentation or guidance on this would be very helpful! >> > > > >> > > > On another note, I am trying to debug this through the scala/java >> > console >> > > > consumer, but I'm having a hard time getting IntelliJ setup. Is there >> > > > anything special or documentation I need to set this up for debugging? >> > > > >> > > >> >