Re: High latency for Kafka transactions

2020-11-13 Thread Gary Russell
Have you increased the producer linger.ms property [1] to ~1s? (Default is 0 so 
records are sent immediately; it generally shouldn't be more than a few ms - 
depending on how long your serialization takes).

If not, perhaps your serialization (or compression) might be the problem; best 
to profile the app to see where the time is being spent.

[1]: https://kafka.apache.org/documentation/#linger.ms

From: John Coleman 
Sent: Friday, November 13, 2020 4:09 AM
To: users@kafka.apache.org 
Subject: High latency for Kafka transactions

Hi,

We have very high latency ~1 second when we send a small number of small 
messages to Kafka. The whole group of messages is less than 1k. We are thinking 
better to just send the messages together as 1 message containing a collection 
of all the messages.

Is there something that we can tweak to reduce this latency? We have 10 
producers and they publish to a topic with 5 consumers. Be nice if we don’t 
have to change our message structure.

TIA
John


Re: High latency for Kafka transactions

2020-11-13 Thread Guozhang Wang
Hello John,

It's a bit hard to reason what's your total producing traffic to the Kafka
brokers, e.g. how frequently do you send a `whole group of messages`? If it
is every 10ms e.g. then your traffic would be 100K per sec, which is pretty
large enough with transactions.


Guozhang

On Fri, Nov 13, 2020 at 1:09 AM John Coleman  wrote:

> Hi,
>
> We have very high latency ~1 second when we send a small number of small
> messages to Kafka. The whole group of messages is less than 1k. We are
> thinking better to just send the messages together as 1 message containing
> a collection of all the messages.
>
> Is there something that we can tweak to reduce this latency? We have 10
> producers and they publish to a topic with 5 consumers. Be nice if we don’t
> have to change our message structure.
>
> TIA
> John
>


-- 
-- Guozhang


Re: StreamBuilder#stream multiple topic and consumed api

2020-11-13 Thread John Roesler
Hi Uirco,

The method that doesn’t take Consumed will fall back to the configured “default 
serdes”. If you don’t have that confit set, it will just keep them as byte 
arrays, which will probably give you an exception at runtime. You’ll probably 
want to use the Consumed argument to set your serdes. 

I’m assuming from your question that the serialized data in your two topics 
have different formats, but that your serdes for them produce the same result 
type. That is, the resulting stream would have just one type in it.

I think you have two options, you can either create a “wrapper deserializer” 
that checks the input topic for each record and then delegates to the 
appropriate deserializer, or you can create two different streams for the two 
topics and then use the ‘merge’ operator to combine them into one stream.

Offhand, I don’t think there would be much of a performance difference among 
any of the options, so I’d suggest going for the approach that looks the 
cleanest to you.

Regarding the last question, I’m not aware of any plans, but if you think that 
would be a better API, you are welcome to propose it in a KIP!

I hope this helps,
John

On Thu, Nov 12, 2020, at 22:00, Uirco Aoi wrote:
> have anyone used public synchronized  KStream stream(final 
> Collection topics)?
> my use case is I have one stream but with two source input using 
> different serde.
> looks like the best way is to use this stream API, but I have some 
> concerns
> 1. currently the only API I can use is 
> public synchronized  KStream stream(final 
> Collection topics)
> does it means after I got it, I have to deserialize the data myself?
> and will it slow down the performance, compare to using it with 
> consumed.with ?
> 2. will it in the future support public synchronized  KStream V> stream(final Collection> )
> class TopicsWithConsumed pair{
> final String topic
> final Consumed consumed
> }
>


Re: MirrorMaker 2 Reload Configuration

2020-11-13 Thread Péter Sinóros-Szabó
Hi,

I tried as well to stop all instances of MM2, but it didn't help for me.
I had to stop all MM2 instances, delete the mm2-config and mm2-status
topics on the destination cluster and start up all MM2 instances again.

Peter


Connect: Setting topic creation config via file (vs. API)

2020-11-13 Thread Fabio.Hecht2
Hi all,

I’m trying to make use of the somewhat new functionality of Connect to create 
new topics on the fly as described in 
KIP-158.
 My test is really simple, and consists of setting the two required 
configurations below:

topic.creation.default.replication.factor=3
topic.creation.default.partitions=1

The issue is that it only works when I set them via Connect API (PUT …/config) 
after the connect cluster is up. Setting the same configs at startup via 
“worker.properties” file does not work.

Is this the intended behavior or is it a bug? Is there a known workaround for 
me to set those properties at startup?

Cheers,

Fabio


Re: MirrorMaker 2 Reload Configuration

2020-11-13 Thread Devaki, Srinivas
Hi All,

After inspecting a few internal topics and running console consumer to
see the payloads in the mm2-configs topic identified that properties
are indeed not getting refreshed, I've assumed that mm2 internally is
joining the existing cluster, so to refresh config I've tried to
completely stop the mm2 cluster i.e reduce the mm2 deployment capacity
to 0 and waited for a couple of mins and increased the capacity again
back to our previous number.

With this approach the mm2 started to load the configuration from the
mm2.properties back again.

Thanks

On Fri, Nov 13, 2020 at 4:02 PM Péter Sinóros-Szabó
 wrote:
>
> Hi Ryanne,
>
> I will open an issue in Jira.
> I see mm2-config and mm2-status topics on both the source and destination
> clusters.
> Should I purge all of them? Or is it enough to purge just the destination
> topics?
>
> Thanks,
> Peter
>
> On Wed, 11 Nov 2020 at 19:33, Ryanne Dolan  wrote:
>
> > Hey guys, this is because the configuration gets loaded into the internal
> > mm2-config topics, and these may get out of sync with the mm2.properties
> > file in some scenarios. I believe this occurs whenever an old/bad
> > configuration gets written to Kafka, which MM2 can read successfully but
> > which causes MM2 to get stuck before it can write any updates back to the
> > mm2-config topics. Just modifying the mm2.properties file does not resolve
> > the issue, since Workers read from the mm2-config topics, not the
> > mm2.properties file directly.
> >
> > The fix is to truncate or delete the mm2-config and mm2-status topics. N.B.
> > do _not_ delete the mm2-offsets topics, as this would cause MM2 to
> > restart replication from offset 0.
> >
> > I'm not sure why deleting these topics works, but it seems to cause Connect
> > to wait for the new configuration to be loaded from mm2.properties, rather
> > than reading the old configuration from mm2-config and getting stuck.
> >
> > Can someone report the issue in jira?
> >
> > Ryanne
> >
> > On Wed, Nov 11, 2020 at 9:35 AM Péter Sinóros-Szabó
> >  wrote:
> >
> > > Hi,
> > >
> > > I have a similar issue.  I changed the source cluster bootstrap address
> > and
> > > MM2 picked it up only partially. Some parts of it still use the old
> > > address, some the new. The old and the new address list is routed to the
> > > same cluster, same brokers, just on a different network path.
> > >
> > > So is there any way to force the configuration update?
> > >
> > > Cheers,
> > > Peter
> > >
> > > On Wed, 4 Nov 2020 at 18:39, Ning Zhang  wrote:
> > >
> > > > if your new topics are not named "topic1" or "topic2", maybe you want
> > to
> > > > use regex * to allow more topics to be considered by Mm2
> > > >
> > > > # regex which defines which topics gets replicated. For eg "foo-.*"
> > > > src-cluster->dst-cluster.topics = topic1,topic2
> > > >
> > > > On 2020/10/30 01:48:00, "Devaki, Srinivas" 
> > > > wrote:
> > > > > Hi Folks,
> > > > >
> > > > > I'm running mirror maker as a dedicated cluster as given in the
> > > > > mirrormaker 2 doc. but for some reason when I add new topics and
> > > > > deploy the mirror maker it's not detecting the new topics at all,
> > even
> > > > > the config dumps in the mirror maker startup logs don't show the
> > newly
> > > > > added topics.
> > > > >
> > > > > I've attached the config that I'm using, initially I assumed that
> > > > > there might be some refresh configuration option either in connect or
> > > > > mirror maker, but the connect rest api doesn't seem to be working in
> > > > > this mode and also couldn't find any refresh configuration option.
> > > > >
> > > > > Any ideas on this? Thank you in advance
> > > > >
> > > > > ```
> > > > > clusters = src-cluster, dst-cluster
> > > > >
> > > > > # disable topic prefixes
> > > > > src-cluster.replication.policy.separator =
> > > > > dst-cluster.replication.policy.separator =
> > > > > replication.policy.separator =
> > > > > source.cluster.alias =
> > > > > target.cluster.alias =
> > > > >
> > > > >
> > > > > # enable idemptotence
> > > > > source.cluster.producer.enable.idempotence = true
> > > > > target.cluster.producer.enable.idempotence = true
> > > > >
> > > > > # connection information for each cluster
> > > > > # This is a comma separated host:port pairs for each cluster
> > > > > # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
> > > > > src-cluster.bootstrap.servers =
> > > > >
> > > >
> > >
> > sng-kfnode1.internal:9092,sng-kfnode1.internal:9092,sng-kfnode1.internal:9092
> > > > > dst-cluster.bootstrap.servers =
> > > > >
> > > >
> > >
> > prod-online-v2-kafka-1.internal:9092,prod-online-v2-kafka-2.internal:9092,prod-online-v2-kafka-3.internal:9092,prod-online-v2-kafka-4.internal:9092,prod-online-v2-kafka-5.internal:9092
> > > > >
> > > > > # regex which defines which topics gets replicated. For eg "foo-.*"
> > > > > src-cluster->dst-cluster.topics = topic1,topic2
> > > > >
> > > > > # client-id
> > > > > src-cluster.client.id = prod-mm2-onlinev1-to-online

Re: MirrorMaker 2 Reload Configuration

2020-11-13 Thread Péter Sinóros-Szabó
Hi Ryanne,

I will open an issue in Jira.
I see mm2-config and mm2-status topics on both the source and destination
clusters.
Should I purge all of them? Or is it enough to purge just the destination
topics?

Thanks,
Peter

On Wed, 11 Nov 2020 at 19:33, Ryanne Dolan  wrote:

> Hey guys, this is because the configuration gets loaded into the internal
> mm2-config topics, and these may get out of sync with the mm2.properties
> file in some scenarios. I believe this occurs whenever an old/bad
> configuration gets written to Kafka, which MM2 can read successfully but
> which causes MM2 to get stuck before it can write any updates back to the
> mm2-config topics. Just modifying the mm2.properties file does not resolve
> the issue, since Workers read from the mm2-config topics, not the
> mm2.properties file directly.
>
> The fix is to truncate or delete the mm2-config and mm2-status topics. N.B.
> do _not_ delete the mm2-offsets topics, as this would cause MM2 to
> restart replication from offset 0.
>
> I'm not sure why deleting these topics works, but it seems to cause Connect
> to wait for the new configuration to be loaded from mm2.properties, rather
> than reading the old configuration from mm2-config and getting stuck.
>
> Can someone report the issue in jira?
>
> Ryanne
>
> On Wed, Nov 11, 2020 at 9:35 AM Péter Sinóros-Szabó
>  wrote:
>
> > Hi,
> >
> > I have a similar issue.  I changed the source cluster bootstrap address
> and
> > MM2 picked it up only partially. Some parts of it still use the old
> > address, some the new. The old and the new address list is routed to the
> > same cluster, same brokers, just on a different network path.
> >
> > So is there any way to force the configuration update?
> >
> > Cheers,
> > Peter
> >
> > On Wed, 4 Nov 2020 at 18:39, Ning Zhang  wrote:
> >
> > > if your new topics are not named "topic1" or "topic2", maybe you want
> to
> > > use regex * to allow more topics to be considered by Mm2
> > >
> > > # regex which defines which topics gets replicated. For eg "foo-.*"
> > > src-cluster->dst-cluster.topics = topic1,topic2
> > >
> > > On 2020/10/30 01:48:00, "Devaki, Srinivas" 
> > > wrote:
> > > > Hi Folks,
> > > >
> > > > I'm running mirror maker as a dedicated cluster as given in the
> > > > mirrormaker 2 doc. but for some reason when I add new topics and
> > > > deploy the mirror maker it's not detecting the new topics at all,
> even
> > > > the config dumps in the mirror maker startup logs don't show the
> newly
> > > > added topics.
> > > >
> > > > I've attached the config that I'm using, initially I assumed that
> > > > there might be some refresh configuration option either in connect or
> > > > mirror maker, but the connect rest api doesn't seem to be working in
> > > > this mode and also couldn't find any refresh configuration option.
> > > >
> > > > Any ideas on this? Thank you in advance
> > > >
> > > > ```
> > > > clusters = src-cluster, dst-cluster
> > > >
> > > > # disable topic prefixes
> > > > src-cluster.replication.policy.separator =
> > > > dst-cluster.replication.policy.separator =
> > > > replication.policy.separator =
> > > > source.cluster.alias =
> > > > target.cluster.alias =
> > > >
> > > >
> > > > # enable idemptotence
> > > > source.cluster.producer.enable.idempotence = true
> > > > target.cluster.producer.enable.idempotence = true
> > > >
> > > > # connection information for each cluster
> > > > # This is a comma separated host:port pairs for each cluster
> > > > # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
> > > > src-cluster.bootstrap.servers =
> > > >
> > >
> >
> sng-kfnode1.internal:9092,sng-kfnode1.internal:9092,sng-kfnode1.internal:9092
> > > > dst-cluster.bootstrap.servers =
> > > >
> > >
> >
> prod-online-v2-kafka-1.internal:9092,prod-online-v2-kafka-2.internal:9092,prod-online-v2-kafka-3.internal:9092,prod-online-v2-kafka-4.internal:9092,prod-online-v2-kafka-5.internal:9092
> > > >
> > > > # regex which defines which topics gets replicated. For eg "foo-.*"
> > > > src-cluster->dst-cluster.topics = topic1,topic2
> > > >
> > > > # client-id
> > > > src-cluster.client.id = prod-mm2-onlinev1-to-onlinev2-consumer-v0
> > > > dst-cluster.client.id = prod-mm2-onlinev1-to-onlinev2-producer-v0
> > > >
> > > >
> > > > # group.instance.id=_mirror_make_instance_1
> > > > # consumer should periodically emit heartbeats
> > > > src-cluster->dst-cluster.consumer.auto.offset.reset = earliest
> > > > src-cluster->dst-cluster.consumer.overrides.auto.offset.reset =
> > earliest
> > > >
> > > > # connector should periodically emit heartbeats
> > > > src-cluster->dst-cluster.emit.heartbeats.enabled = true
> > > >
> > > > # frequency of heartbeats, default is 5 seconds
> > > > src-cluster->dst-cluster.emit.heartbeats.interval.seconds = 10
> > > >
> > > > # connector should periodically emit consumer offset information
> > > > src-cluster->dst-cluster.emit.checkpoints.enabled = true
> > > >
> > > > # frequency of checkpoints, default is 5 seconds
>

High latency for Kafka transactions

2020-11-13 Thread John Coleman
Hi,

We have very high latency ~1 second when we send a small number of small 
messages to Kafka. The whole group of messages is less than 1k. We are thinking 
better to just send the messages together as 1 message containing a collection 
of all the messages.

Is there something that we can tweak to reduce this latency? We have 10 
producers and they publish to a topic with 5 consumers. Be nice if we don’t 
have to change our message structure.

TIA
John