Re: Long delay between incoming and outgoing messages using kafka streams

2019-10-09 Thread Sophie Blee-Goldman
Hi Petter,

I'd recommend turning off caching by setting
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);

2.3.0 also has some known performance issues that will be fixed in 2.3.1,
but they
shouldn't be noticeable if you turn caching off and aren't reading/writing
to topics
with a very high partition count. These are fixed in 2.3.1 which should be
released
soon for you to upgrade, but the caching is likely the main reason for the
latency you see.

I'd also note that Streams, and Kafka in general, is typically tuned for
high
throughput rather than low latency, so I wouldn't be too concerned about
a large latency unless that is a specific requirement.

Cheers,
Sophie

On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson 
wrote:

> Hi,
>
> I have a fairly simple kafka streams application that read messages from
> two topics. The problem I am facing is that the delay between sending
> events to the streams application and it producing results is very high (as
> in several minutes). My question is: how can I make this latency smaller?
>
> The streams is doing the following:
> ktable1 = topic1
>   -> (filter out messages using flatMap)
>   -> groupBy (with new key, adds internal rekeying topic)
>   -> aggregate (in memory store backed by internal compacted topic)
>
> ktabe2 = topic2
>   -> (rekey to same key as ktable1 over internal topic)
>   -> join (with ktable1)
>   -> aggregate (in memory store backed by internal compacted topic)
>
> ktable2.toStream.to(topic2)
>
> Ktable1 keep configuration that allows messages to pass through and be
> aggregated into ktable2. Ktable2 keeps aggregates based on messages on
> topic2. Ktable2.toStream is then used to put the aggregated messages back
> out on topic2. The "problem" (or misunderstanding as to how kafka stream is
> processing messages) is that the delay from sending a message on topic1 to
> the point where messages received on topic2 are passing the join is several
> minutes. With the settings I have (see below) on a not that heavily loaded
> system, I would assume the latency would be a couple of seconds (based on
> the COMMIT_INTERVAL_MS_CONFIG).
>
> I use the following settings (as well as settings for bootstrap servers,
> application id and so forth):
> p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000)
> p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
>
> The store used for the KTables is the one returned by
> "Stores.inMemoryKeyValueStore()".
>
> Kafka libraries use version "2.3.0" and the "kafka-streams-scala" scaladsl
> is used to build the streams. The broker is using version "1.1.0".
>
> Best regards,
> Petter
>


Re: Brokers occasionally dropping out of cluster

2019-10-09 Thread M. Manna
Hello Peter,

have you tried setting a higher value for connection timeout ?

I am running 2.3.0 with 30s for zk  sessions and 90s for zk connection.

 I haven’t checked 2.3.1 yet, looks like you may have found something worth
checking before upgrading.

Regards,

On Tue, 8 Oct 2019 at 21:41, Peter Bukowinski  wrote:

> Greetings,
>
> I’m experiencing a concerning issue with brokers dropping out of kafka
> clusters, and I suspect it may be due to zookeeper timeouts. I have many
> clusters running kafka 2.3.1 and have seen this issue on more than a few,
> though this issue predates this version.
>
> The clusters use  zookeeper.session.timeout.ms=3, and
> zookeeper.connection.timeout.ms is unset.
>
> This is what I see in the log of broker 14 before and after the broker has
> been kicked out of its cluster:
>
> [2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to
> retention time 360ms breach (kafka.log.Log)
> [2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008]
> for deletion. (kafka.log.Log)
> [2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)
> [2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)
> [2019-10-07 11:03:56,957] INFO Deleted log
> /data/3/kl/internal_test-52/01975332.log.deleted.
> (kafka.log.LogSegment)
> [2019-10-07 11:03:56,957] INFO Deleted offset index
> /data/3/kl/internal_test-52/01975332.index.deleted.
> (kafka.log.LogSegment)
> [2019-10-07 11:03:56,958] INFO Deleted time index
> /data/3/kl/internal_test-52/01975332.timeindex.deleted.
> (kafka.log.LogSegment)
> [2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 1 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:12:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:52:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 1 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 14:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> 

Long delay between incoming and outgoing messages using kafka streams

2019-10-09 Thread Petter Arvidsson
Hi,

I have a fairly simple kafka streams application that read messages from
two topics. The problem I am facing is that the delay between sending
events to the streams application and it producing results is very high (as
in several minutes). My question is: how can I make this latency smaller?

The streams is doing the following:
ktable1 = topic1
  -> (filter out messages using flatMap)
  -> groupBy (with new key, adds internal rekeying topic)
  -> aggregate (in memory store backed by internal compacted topic)

ktabe2 = topic2
  -> (rekey to same key as ktable1 over internal topic)
  -> join (with ktable1)
  -> aggregate (in memory store backed by internal compacted topic)

ktable2.toStream.to(topic2)

Ktable1 keep configuration that allows messages to pass through and be
aggregated into ktable2. Ktable2 keeps aggregates based on messages on
topic2. Ktable2.toStream is then used to put the aggregated messages back
out on topic2. The "problem" (or misunderstanding as to how kafka stream is
processing messages) is that the delay from sending a message on topic1 to
the point where messages received on topic2 are passing the join is several
minutes. With the settings I have (see below) on a not that heavily loaded
system, I would assume the latency would be a couple of seconds (based on
the COMMIT_INTERVAL_MS_CONFIG).

I use the following settings (as well as settings for bootstrap servers,
application id and so forth):
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000)
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)

The store used for the KTables is the one returned by
"Stores.inMemoryKeyValueStore()".

Kafka libraries use version "2.3.0" and the "kafka-streams-scala" scaladsl
is used to build the streams. The broker is using version "1.1.0".

Best regards,
Petter


LogAppendTime handling in consumer

2019-10-09 Thread Jan Hruban
Hi,

I have a Kafka topic configured with:

  message.timestamp.type=LogAppendTime

I'm using the "brod" [1] Kafka client and I have noticed that it does
return the CreateTime instead of LogAppendTime when fetching the
messages.

I have tracked down that the "kafka_protocol" library (used by the brod
client) always uses the firstTimestamp from the Record Batch and
timestampDelta from the Record to compute each record's timestamp [2].
This always gives the CreateTime.

In the official Java client, it looks like that when LogAppendTime is in
effect (determined by the attribute timestampType in the Record Batch),
it uses the maxTimestamp from the Record Batch [3] to set the timestamp
in each Record [4].

Is this the exact behaviour which is expected to be followed by clients?
I've come just across several resources which gave me few hints:

  * KIP-32 [5], which just talks about the Message format with magic < 2.

  * KAFKA-5353 [6], which changed the baseTimeStamp to always be the
create timestamp.

On the other hand, the documentation does not give a clue that clients
should use the maxTimestamp when LogAppendTime is in use:

  * The Record Batch documentation [7] does not explain the individual
fields semantics.

  * Wiki page "A Guide To The Kafka Protocol" [8] is more detailed on
the FirstTimestamp, TimestampDelta and MaxTimestamp, but does not
mention what implications does have the timestamp type on those
fields.

From my point of view, this is either a deficiency in Kafka, which
should instead always provide the correct authoritative timestamp to
consumers. Or if it is indeed expected that this logic is handled by
clients, it should be explicitly written in the official documentation.


For the record, here's a pull request [9] to the kafka_protocol
library.




[1] https://github.com/klarna/brod

[2] 
https://github.com/klarna/kafka_protocol/blob/cc13902191b9ca3970a65388697c1069ae68fd2a/src/kpro_batch.erl#L249

[3] 
https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L558

[4] 
https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L330-L331

[5] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

[6] https://issues.apache.org/jira/browse/KAFKA-5353

[7] http://kafka.apache.org/documentation/#recordbatch

[8] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

[9] https://github.com/klarna/kafka_protocol/pull/60


-- 
Jan Hruban


signature.asc
Description: PGP signature