Hey, thanks for that Dmitriy! I'll have a look. On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote:
> Not really associated with Sarama. > > But your issues sounds pretty much same i faced some time ago and fixed, > here it is: https://github.com/Shopify/sarama/issues/885 > > Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps. > > On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <craigch...@gmail.com> wrote: > > > Hi Dmitry, > > > > Are you associated with the Sarama project? If so, understand that part > of > > what I want is to learn about Sarama and the Kafka message format ;) > > > > The problem I'm having is that if I turn on: > > > > log.message.timestamp.type=LogAppendTime > > > > in the broker, then produce on topic1 with console producer, I will see > > timestamps in the sarama client. If I produce on topic2 with telegraf > > (incidentally, I think telegraf is a sarama producer), then I don't see > > timestamps in the sarama client. In both cases, if I consume using the > > console consumer (with --property print.timestamp=true) I *do* see > > timestamps. > > > > I'm happy to debug this issue myself and submit a PR to sarama, but I am > > missing some fundamentals of how to decode the kafka message format and > > would really like some pointers. > > > > Cheers, > > Craig > > > > P.S. Here is the sarama code I'm using to test: > > > > package main > > > > import ( > > "fmt" > > "log" > > "os" > > "os/signal" > > "time" > > > > "github.com/Shopify/sarama" > > ) > > > > func main() { > > > > // Initialize Sarama logging > > sarama.Logger = log.New(os.Stdout, "[Sarama] ", > > log.Ldate|log.Lmicroseconds|log.Lshortfile) > > > > signals := make(chan os.Signal, 1) > > signal.Notify(signals, os.Interrupt) > > > > config := sarama.NewConfig() > > config.Consumer.Return.Errors = true > > config.ClientID = "consumer-test" > > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute > > config.Metadata.Full = true > > // config.Version = sarama.V0_10_0_0 > > config.Version = sarama.V1_1_0_0 > > // config.Version = sarama.V0_10_2_1 > > config.Consumer.Offsets.Initial = sarama.OffsetOldest > > > > brokers := []string{"localhost:9092"} > > // brokers := > > > > > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"} > > > > client, err := sarama.NewConsumer(brokers, config) > > if err != nil { > > panic(err) > > } > > > > // topic := "topic1" > > topic := "topic2" > > // topic := "metric-influx-measurement" > > // How to decide partition, is it fixed value...? > > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest) > > if err != nil { > > panic(err) > > } > > > > defer func() { > > if err := client.Close(); err != nil { > > panic(err) > > } > > }() > > > > // Count how many message processed > > msgCount := 0 > > > > go func() { > > for { > > select { > > case err := <-consumer.Errors(): > > fmt.Println(err) > > case msg := <-consumer.Messages(): > > msgCount++ > > fmt.Println(msg.Timestamp) > > fmt.Println("Received messages", string(msg.Key), string(msg.Value)) > > case <-signals: > > fmt.Println("Interrupt is detected") > > break > > } > > } > > }() > > <-signals > > } > > > > > > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com> > > wrote: > > > > > Hey Craig, > > > > > > what exact problem you have with Sarama client? > > > > > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com> > > wrote: > > > > > > > Hi! > > > > > > > > I'm working on debugging a problem with how message timestamps are > > > handled > > > > in the sarama client. In some cases, the sarama client won't > > associate a > > > > timestamp with a message while the kafka console consumer does. I've > > > found > > > > the documentation on the message format here: > > > > > > > > https://kafka.apache.org/documentation/#messageformat > > > > > > > > But the information there is very sparse. For instance, what are > > > > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm > debugging > > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be > the > > > > timestamp I want. Is there some state about the message that I need > to > > > > understand in order to have maxTimestamp be used? Any further > > > > documentation or guidance on this would be very helpful! > > > > > > > > On another note, I am trying to debug this through the scala/java > > console > > > > consumer, but I'm having a hard time getting IntelliJ setup. Is > there > > > > anything special or documentation I need to set this up for > debugging? > > > > > > > > > >