Not really associated with Sarama.

But your issues sounds pretty much same i faced some time ago and fixed,
here it is: https://github.com/Shopify/sarama/issues/885

Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.

On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <craigch...@gmail.com> wrote:

> Hi Dmitry,
>
> Are you associated with the Sarama project?  If so, understand that part of
> what I want is to learn about Sarama and the Kafka message format ;)
>
> The problem I'm having is that if I turn on:
>
> log.message.timestamp.type=LogAppendTime
>
> in the broker, then produce on topic1 with console producer, I will see
> timestamps in the sarama client.  If I produce on topic2 with telegraf
> (incidentally, I think telegraf is a sarama producer), then I don't see
> timestamps in the sarama client.  In both cases, if I consume using the
> console consumer (with --property print.timestamp=true) I *do* see
> timestamps.
>
> I'm happy to debug this issue myself and submit a PR to sarama, but I am
> missing some fundamentals of how to decode the kafka message format and
> would really like some pointers.
>
> Cheers,
> Craig
>
> P.S.  Here is the sarama code I'm using to test:
>
> package main
>
> import (
> "fmt"
> "log"
> "os"
> "os/signal"
> "time"
>
> "github.com/Shopify/sarama"
> )
>
> func main() {
>
> // Initialize Sarama logging
> sarama.Logger = log.New(os.Stdout, "[Sarama] ",
> log.Ldate|log.Lmicroseconds|log.Lshortfile)
>
> signals := make(chan os.Signal, 1)
> signal.Notify(signals, os.Interrupt)
>
> config := sarama.NewConfig()
> config.Consumer.Return.Errors = true
> config.ClientID = "consumer-test"
> config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
> config.Metadata.Full = true
> // config.Version = sarama.V0_10_0_0
> config.Version = sarama.V1_1_0_0
> // config.Version = sarama.V0_10_2_1
> config.Consumer.Offsets.Initial = sarama.OffsetOldest
>
> brokers := []string{"localhost:9092"}
> // brokers :=
>
> []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
>
> client, err := sarama.NewConsumer(brokers, config)
> if err != nil {
> panic(err)
> }
>
> // topic := "topic1"
> topic := "topic2"
> // topic := "metric-influx-measurement"
> // How to decide partition, is it fixed value...?
> consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
> if err != nil {
> panic(err)
> }
>
> defer func() {
> if err := client.Close(); err != nil {
> panic(err)
> }
> }()
>
> // Count how many message processed
> msgCount := 0
>
> go func() {
> for {
> select {
> case err := <-consumer.Errors():
> fmt.Println(err)
> case msg := <-consumer.Messages():
> msgCount++
> fmt.Println(msg.Timestamp)
> fmt.Println("Received messages", string(msg.Key), string(msg.Value))
> case <-signals:
> fmt.Println("Interrupt is detected")
> break
> }
> }
> }()
> <-signals
> }
>
>
> On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Hey Craig,
> >
> > what exact problem you have with Sarama client?
> >
> > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com>
> wrote:
> >
> > > Hi!
> > >
> > > I'm working on debugging a problem with how message timestamps are
> > handled
> > > in the sarama client.  In some cases, the sarama client won't
> associate a
> > > timestamp with a message while the kafka console consumer does.  I've
> > found
> > > the documentation on the message format here:
> > >
> > > https://kafka.apache.org/documentation/#messageformat
> > >
> > > But the information there is very sparse.  For instance, what are
> > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> > > timestamp I want.  Is there some state about the message that I need to
> > > understand in order to have maxTimestamp be used?  Any further
> > > documentation or guidance on this would be very helpful!
> > >
> > > On another note, I am trying to debug this through the scala/java
> console
> > > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> > > anything special or documentation I need to set this up for debugging?
> > >
> >
>

Reply via email to