Hi Dmitry, Are you associated with the Sarama project? If so, understand that part of what I want is to learn about Sarama and the Kafka message format ;)
The problem I'm having is that if I turn on: log.message.timestamp.type=LogAppendTime in the broker, then produce on topic1 with console producer, I will see timestamps in the sarama client. If I produce on topic2 with telegraf (incidentally, I think telegraf is a sarama producer), then I don't see timestamps in the sarama client. In both cases, if I consume using the console consumer (with --property print.timestamp=true) I *do* see timestamps. I'm happy to debug this issue myself and submit a PR to sarama, but I am missing some fundamentals of how to decode the kafka message format and would really like some pointers. Cheers, Craig P.S. Here is the sarama code I'm using to test: package main import ( "fmt" "log" "os" "os/signal" "time" "github.com/Shopify/sarama" ) func main() { // Initialize Sarama logging sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) config := sarama.NewConfig() config.Consumer.Return.Errors = true config.ClientID = "consumer-test" config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute config.Metadata.Full = true // config.Version = sarama.V0_10_0_0 config.Version = sarama.V1_1_0_0 // config.Version = sarama.V0_10_2_1 config.Consumer.Offsets.Initial = sarama.OffsetOldest brokers := []string{"localhost:9092"} // brokers := []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"} client, err := sarama.NewConsumer(brokers, config) if err != nil { panic(err) } // topic := "topic1" topic := "topic2" // topic := "metric-influx-measurement" // How to decide partition, is it fixed value...? consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } defer func() { if err := client.Close(); err != nil { panic(err) } }() // Count how many message processed msgCount := 0 go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println(msg.Timestamp) fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") break } } }() <-signals } On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote: > Hey Craig, > > what exact problem you have with Sarama client? > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com> wrote: > > > Hi! > > > > I'm working on debugging a problem with how message timestamps are > handled > > in the sarama client. In some cases, the sarama client won't associate a > > timestamp with a message while the kafka console consumer does. I've > found > > the documentation on the message format here: > > > > https://kafka.apache.org/documentation/#messageformat > > > > But the information there is very sparse. For instance, what are > > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm debugging > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the > > timestamp I want. Is there some state about the message that I need to > > understand in order to have maxTimestamp be used? Any further > > documentation or guidance on this would be very helpful! > > > > On another note, I am trying to debug this through the scala/java console > > consumer, but I'm having a hard time getting IntelliJ setup. Is there > > anything special or documentation I need to set this up for debugging? > > >