Re: Reading from lagging replicas

2021-09-20 Thread Michał Łowicki
Thanks. I'm aware of ISR but looking for exact logic inside Java client how
is this handled so when follower replica, consumer is using, starts lagging
/ goes out-of-sync.

On Mon, Sep 20, 2021 at 2:52 PM Luke Chen  wrote:

> Hi Michał,
> I think you want to check the term in Kafka: In Sync Replicas (a.k.a ISR).
> You might want to check the "Replication" section in doc here
> <https://kafka.apache.org/documentation/#replication>.
>
> The configuration is this one: *replica.lag.time.max.ms
> <http://replica.lag.time.max.ms>*
>
> https://kafka.apache.org/documentation/#brokerconfigs_replica.lag.time.max.ms
>
> Hope that helps.
>
> Thank you.
> Luke
>
> On Mon, Sep 20, 2021 at 8:30 PM Michał Łowicki  wrote:
>
> > Hey,
> >
> > When Java client is reading from replica then how is handled the case
> when
> > such replica starts to lag? (struggles to replicate data from the
> leader).
> > Does consumer, coordinator or group leader have any logic to detect such
> > cases and try to switch over to other replica in those cases? (pointers
> to
> > code, configuration options would be awesome). Thanks in advance.
> >
> > --
> > BR,
> > Michał Łowicki
> >
>


-- 
BR,
Michał Łowicki


Reading from lagging replicas

2021-09-20 Thread Michał Łowicki
Hey,

When Java client is reading from replica then how is handled the case when
such replica starts to lag? (struggles to replicate data from the leader).
Does consumer, coordinator or group leader have any logic to detect such
cases and try to switch over to other replica in those cases? (pointers to
code, configuration options would be awesome). Thanks in advance.

-- 
BR,
Michał Łowicki


How is __consumer_offsets partitioned?

2021-09-03 Thread Michał Łowicki
Hey,

Could someone please point me to the code / doc where I can find
information what is used as key and how messages are spread across
partitions for this internal topic?

-- 
BR,
Michał Łowicki


Possible race in handling IP address change

2021-05-19 Thread Michał Łowicki
Hey,

We've experienced few incidents where suddenly Kafka producers weren't able
to send messages and were simply timing out. I've a repro now after some
investigation. We're running the cluster of 3 brokers on K8s and it's
enough to kill the underlying AWS VM using AWS Console which causes also IP
change (in our case 10.128.132.196 → 10.128.135.90). It needs to be VM
hosting broker which is a leader of partition (node 1).

Slightly before the kill there were 2 events:

About to close the idle connection from 10001 due to being idle for 38545
> millis
> About to close the idle connection from 10002 due to being idle for 34946
> millis


And then client picks the node to connect to:

Removing node 10.128.140.202:9092 (id: 10002 rack: us-east-1c) from least
> loaded node selection since it is neither ready for sending or connecting
> Found least loaded connecting node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a)
> About to close the idle connection from 1 due to being idle for 30529
> millis
> Node 1 disconnected.
> Initiating connection to node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a) using address /10.128.132.196


and then it repeats:

Found least loaded connecting node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a)
> About to close the idle connection from 1 due to being idle for 30027
> millis
> Node 1 disconnected.
> Initiating connection to node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a) using address /10.128.132.196


Found least loaded connecting node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a)
> About to close the idle connection from 1 due to being idle for 30027
> millis
> Node 1 disconnected.
> Initiating connection to node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a) using address /10.128.132.196


Found least loaded connecting node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a)
> About to close the idle connection from 1 due to being idle for 30027
> millis
> Node 1 disconnected.
> Initiating connection to node 10.128.132.196:9092 (id: 1 rack:
> us-east-1a) using address /10.128.132.196


Affected clients won't ever request METADATA from other brokers (10001 or
10002) to discover new IP. It doesn't happen for all clients as some handle
the process gracefully and those log with:

log.info("Hostname for node {} changed from {} to {}.", id,
> connectionState.host(), host);


-- 
BR,
Michał Łowicki


Re: Is bootstrap.servers resolved only once?

2021-05-17 Thread Michał Łowicki
On Mon, May 17, 2021 at 2:25 PM Shilin Wu  wrote:

> Bootstrap servers are just used for initial connection, clients will get
> all server metadata from one of the bootstrap servers to discover the full
> cluster membership (which may change dynamically), this list does not have
> to contain the full set of servers (you may want more than one, though, in
> case a server that used for bootstrapping is down).
>
> Clients (producers or consumers) make use of all servers irrespective of
> which servers are specified in bootstrap.servers for bootstrapping.
>
> In case of cluster change (e.g. new ip addresses for new servers), the
> clients will receive updates from the server group.
>
> To answer your question:
> 1. Does it mean that if *bootstrap.servers* contains only one address which
> resolves to multiple IP addresses then still only one IP will be taken into
> account during bootstrap?
> => It shouldn't matter. As long as it resolves at least one functioning
> broker, it will work. Make sure the advertised listeners are configured
> correctly and are externally accessible.
> All actual server meta data are retrieved from the functioning broker and
> updated accordingly.
>

AFAIU it can matter in one (probably minor) scenario where KafkaProducer
will resolve DNS name into just one IP (first / random) and before
connecting to it to get metadata about the cluster the node would die and
no other IP would be tried out. If it would resolve to all then it could
try others. Currently KafkaProducer just gives up. This is easy though to
add logic to retry or so.


>
> 2. What happens during catastrophic scenario where all brokers die and
> after restart they got different IPs? Will ever *bootstrap.servers* be
> evaluated once again so the clients could re-connect to the cluster
> assuming that bootstrap.servers will resolve to new IPs?
> => If you set client.dns.lookup="use_all_dns_ips" in your client
> configuration, it will use all of the IP addresses returned by DNS, not
> just the first (or a random one).
>
> What if it is so all ip addresses are changed? and you still have same DNS,
> but points to completely new host ips?
>
> From
>
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> ,
> it seems that the address resolution for bootstrap server is done only once
> in the constructor. So clients may have to be restarted in this case.
> See line 413.
>
> However, if you maintain an active cluster in a controlled change process
> (not due to a fault), and restart the servers and change ip addresses one
> by one,
> it should be possible to change the ip address gradually as the update of
> metadata should happen in a timely manner.
>
> In short, don't kill them all and restart them at the same time with new
> addresses.
>
> After all, you can always add an extra layer of retry loop to re-create the
> producer with the same producer config, if Java dns cache TTL expires, it
> may pick up new addresses eventually. This may take long though.
>

Yeah, we had scenario that because of the human error the cluster on K8s
was wiped out, recreated fast but all IPs changed and consumers / producers
couldn't reconnect without restarting. This still can be patched in the app
by maybe checking for n consecutive send failures and recreate producer
then.


>
>
>
>
> [image: Confluent] <https://www.confluent.io>
> Wu Shilin
> Solution Architect
> +6581007012
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/company/confluent/>[image: Slack]
> <https://slackpass.io/confluentcommunity>[image: YouTube]
> <https://youtube.com/confluent>
> [image: Kafka Summit] <https://www.kafka-summit.org/>
>
>
> On Mon, May 17, 2021 at 8:03 PM Michał Łowicki  wrote:
>
> > Hey,
> >
> > Trying to understand how *bootstrap.servers* is handled for
> KafkaProducer.
> > I see that it's processed during creating of producer (here
> > <
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L414
> > >)
> > and later if client DNS lookup is set to "default" it's being resolved to
> > only one IP address (here
> > <
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java#L72
> > >
> > ).
> >
> > 1. Does it mean that if *bootstrap.servers* contains only one ad

Is bootstrap.servers resolved only once?

2021-05-17 Thread Michał Łowicki
Hey,

Trying to understand how *bootstrap.servers* is handled for KafkaProducer.
I see that it's processed during creating of producer (here
<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L414>)
and later if client DNS lookup is set to "default" it's being resolved to
only one IP address (here
<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java#L72>
).

1. Does it mean that if *bootstrap.servers* contains only one address which
resolves to multiple IP addresses then still only one IP will be taken into
account during bootstrap?
2. What happens during catastrophic scenario where all brokers die and
after restart they got different IPs? Will ever *bootstrap.servers* be
evaluated once again so the clients could re-connect to the cluster
assuming that bootstrap.servers will resolve to new IPs?

-- 
BR,
Michał Łowicki


Re: Java client 2.4.1 doesn't always handle IP changes

2021-05-14 Thread Michał Łowicki
Checked Java DNS resolution caching:

```
sun.net.InetAddressCachePolicy.get();
sun.net.InetAddressCachePolicy.getNegative();
```

and those return 30 and 10 respectively. So it seems fine and it shouldn't 
cache for too long.

On 2021/05/14 12:55:00, Michał Łowicki  wrote: 
> Hey,
> 
> Had incident where one broker died and got later different IP address. Some
> clients / pods (everything lives on K8s) detected IP change and logged:
> 
> [Producer clientId=producer-1] Hostname for node 1 changed from  to
> > .
> 
> 
> (logged by org.apache.kafka.clients.ClusterConnectionStates) but many
> didn't and they couldn't actually send anything, continuously timing out.
> Any clue if there were some know issues with that and maybe something has
> been fixed in > 2.4.1?
> 
> In the meantime I'm also checking DNS caching but nothing at OS-level but
> still verifying JVM (I'm using Java 14).
> 
> -- 
> BR,
> Michał Łowicki
> 


Java client 2.4.1 doesn't always handle IP changes

2021-05-14 Thread Michał Łowicki
Hey,

Had incident where one broker died and got later different IP address. Some
clients / pods (everything lives on K8s) detected IP change and logged:

[Producer clientId=producer-1] Hostname for node 1 changed from  to
> .


(logged by org.apache.kafka.clients.ClusterConnectionStates) but many
didn't and they couldn't actually send anything, continuously timing out.
Any clue if there were some know issues with that and maybe something has
been fixed in > 2.4.1?

In the meantime I'm also checking DNS caching but nothing at OS-level but
still verifying JVM (I'm using Java 14).

-- 
BR,
Michał Łowicki


Mechanism similar to __consumer_offsets but for consumer.endOffsets

2020-10-28 Thread Michał Łowicki
Hey,

Is there any way to continuously get end offsets for all partitions from
the topic or across the whole cluster as with consumer offsets? Periodic
polling consumer.endOffsets
<https://github.com/apache/kafka/blob/dffc7f8c30824cb6bf38c05838a466488cbb1f81/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java#L239>
is one option, but maybe it can be done in a different way.

-- 
BR,
Michał Łowicki