Granting permission for jira

2021-05-17 Thread ping lv
Please grant permission for jira to luoen


Re: kafka command is not working

2021-05-17 Thread Ran Lupovich
In bootstrap server put at least two brokers nodes of the cluster to get
the metadata of the initial connection

בתאריך יום ב׳, 17 במאי 2021, 18:07, מאת Aniket Pant ‏:

> Hi team,
> my question is , i have 3 nodes of kafka cluster and when i stop one broker
> i cannot lag messages it show me error like
> ```
> [2021-05-17 19:20:22,583] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1 (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:22,685] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:22,887] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:23,189] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:23,692] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:24,395] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:25,499] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2021-05-17 19:20:26,704] WARN [AdminClient clientId=adminclient-1]
> Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
> established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> Error: Executing consumer group command failed due to
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
> node assignment.
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
> node assignment.
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at
>
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
> at
>
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
> at
> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
> at
> kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out
> waiting for a node assignment.
> ```
> I was running `bin/kafka-consumer-groups.sh --describe --group pfsense_user
> --all-topics --bootstrap-server xx.xx.xx.xx:9092'
>


Upgrading Apache Kafka 2.7 to Java 11 Changes authenticationID sent to ZooKeeper Enabling Only 1 Kafka Broker to r/w znodes

2021-05-17 Thread Chapin, Ryan
I am having some trouble upgrading a kerberized Kafka 2.7 cluster to Java 11. 
Once upgraded, Kafka no longer sends the expected authenticationID to ZooKeeper 
and the ACLs on the znodes now only allow access to the Kafka host that first 
created them. Complete details of this problem can be found here: 
https://www.ryanchapin.com/upgrading-apache-kafka-2-7-to-java-11-changes-authenticationid-sent-to-zookeeper-enabling-only-1-kafka-broker-to-r-w-znodes/
 Any help is greatly appreciated.

Ryan Chapin
Principal Engineer II/Big Data Architect
Network Management and Architecture
Hughes Network Systems
An EchoStar Company


Re: Application state persistence in Kafka

2021-05-17 Thread mangat rai
Urko,

You can enable changelog topics for your state store. This will enable the
application to persist the data to a Kafka topic. Next time when
application start, it will first build it's state by using this topic. Are
you using Kstreams or the low-level processor API?

Regards,
Mangat

On Mon, May 17, 2021 at 5:30 PM Urko Lekuona  wrote:

> Hello,
>
> I have a question regarding the use of Kafka/Kafka Streams to store the
> state of a stateful application.
>
> My application is filtering on a stream based on a value from the previous
> event of the stream. For example, if the previous car with the same model
> was red, this car cannot be red. Previously, I was saving this state
> in-memory (a map), where I could query the value of the previous event and
> overwrite it if necessary.
>
> The problem with this implementation is that I lost the state if my
> application crashed, which is something I can't afford. I thought about
> storing it in a DB, but as I'm using Kafka, I've decided, and managed, to
> store it there. Basically, I'm creating a GlobalKTable and using the
> Streams facade (KafkaStreams.store) to query it's contents. This is
> working, but I'm not familiarized with the Kafka environment enough to
> understand it's implications.
>
> So that's why I come for help. Am I doing this right? Should or shouldn't I
> do this?
>
> P.S. Message retention time shouldn't be an issue in my case, as this
> information expires after a day, so as long as I can retain the events that
> long, I'll be fine.
>
> Thanks in advance,
>
> Urko
>


Application state persistence in Kafka

2021-05-17 Thread Urko Lekuona
Hello,

I have a question regarding the use of Kafka/Kafka Streams to store the
state of a stateful application.

My application is filtering on a stream based on a value from the previous
event of the stream. For example, if the previous car with the same model
was red, this car cannot be red. Previously, I was saving this state
in-memory (a map), where I could query the value of the previous event and
overwrite it if necessary.

The problem with this implementation is that I lost the state if my
application crashed, which is something I can't afford. I thought about
storing it in a DB, but as I'm using Kafka, I've decided, and managed, to
store it there. Basically, I'm creating a GlobalKTable and using the
Streams facade (KafkaStreams.store) to query it's contents. This is
working, but I'm not familiarized with the Kafka environment enough to
understand it's implications.

So that's why I come for help. Am I doing this right? Should or shouldn't I
do this?

P.S. Message retention time shouldn't be an issue in my case, as this
information expires after a day, so as long as I can retain the events that
long, I'll be fine.

Thanks in advance,

Urko


kafka command is not working

2021-05-17 Thread Aniket Pant
Hi team,
my question is , i have 3 nodes of kafka cluster and when i stop one broker
i cannot lag messages it show me error like
```
[2021-05-17 19:20:22,583] WARN [AdminClient clientId=adminclient-1]
Connection to node -1 (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:22,685] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:22,887] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:23,189] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:23,692] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:24,395] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:25,499] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2021-05-17 19:20:26,704] WARN [AdminClient clientId=adminclient-1]
Connection to node -1  (localhost/xx.xx.xx.xx:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
Error: Executing consumer group command failed due to
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
node assignment.
java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
node assignment.
at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
at
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
at
kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out
waiting for a node assignment.
```
I was running `bin/kafka-consumer-groups.sh --describe --group pfsense_user
--all-topics --bootstrap-server xx.xx.xx.xx:9092'


Re: Is bootstrap.servers resolved only once?

2021-05-17 Thread Shilin Wu
Yes.

On initial bootstrapping dns resolves all ip as of 2.10, if you specified
client.dns.lookup to use all addresses.

But the dns is evaluated only once in the producer initialisation
(constructor) so it will fail if all broker ip changed at once.


On Mon, 17 May 2021 at 9:42 PM, Michał Łowicki  wrote:

> On Mon, May 17, 2021 at 2:25 PM Shilin Wu 
> wrote:
>
> > Bootstrap servers are just used for initial connection, clients will get
> > all server metadata from one of the bootstrap servers to discover the
> full
> > cluster membership (which may change dynamically), this list does not
> have
> > to contain the full set of servers (you may want more than one, though,
> in
> > case a server that used for bootstrapping is down).
> >
> > Clients (producers or consumers) make use of all servers irrespective of
> > which servers are specified in bootstrap.servers for bootstrapping.
> >
> > In case of cluster change (e.g. new ip addresses for new servers), the
> > clients will receive updates from the server group.
> >
> > To answer your question:
> > 1. Does it mean that if *bootstrap.servers* contains only one address
> which
> > resolves to multiple IP addresses then still only one IP will be taken
> into
> > account during bootstrap?
> > => It shouldn't matter. As long as it resolves at least one functioning
> > broker, it will work. Make sure the advertised listeners are configured
> > correctly and are externally accessible.
> > All actual server meta data are retrieved from the functioning broker and
> > updated accordingly.
> >
>
> AFAIU it can matter in one (probably minor) scenario where KafkaProducer
> will resolve DNS name into just one IP (first / random) and before
> connecting to it to get metadata about the cluster the node would die and
> no other IP would be tried out. If it would resolve to all then it could
> try others. Currently KafkaProducer just gives up. This is easy though to
> add logic to retry or so.
>
>
> >
> > 2. What happens during catastrophic scenario where all brokers die and
> > after restart they got different IPs? Will ever *bootstrap.servers* be
> > evaluated once again so the clients could re-connect to the cluster
> > assuming that bootstrap.servers will resolve to new IPs?
> > => If you set client.dns.lookup="use_all_dns_ips" in your client
> > configuration, it will use all of the IP addresses returned by DNS, not
> > just the first (or a random one).
> >
> > What if it is so all ip addresses are changed? and you still have same
> DNS,
> > but points to completely new host ips?
> >
> > From
> >
> >
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> > ,
> > it seems that the address resolution for bootstrap server is done only
> once
> > in the constructor. So clients may have to be restarted in this case.
> > See line 413.
> >
> > However, if you maintain an active cluster in a controlled change process
> > (not due to a fault), and restart the servers and change ip addresses one
> > by one,
> > it should be possible to change the ip address gradually as the update of
> > metadata should happen in a timely manner.
> >
> > In short, don't kill them all and restart them at the same time with new
> > addresses.
> >
> > After all, you can always add an extra layer of retry loop to re-create
> the
> > producer with the same producer config, if Java dns cache TTL expires, it
> > may pick up new addresses eventually. This may take long though.
> >
>
> Yeah, we had scenario that because of the human error the cluster on K8s
> was wiped out, recreated fast but all IPs changed and consumers / producers
> couldn't reconnect without restarting. This still can be patched in the app
> by maybe checking for n consecutive send failures and recreate producer
> then.
>
>
> >
> >
> >
> >
> > [image: Confluent] 
> > Wu Shilin
> > Solution Architect
> > +6581007012
> > Follow us: [image: Blog]
> > <
> >
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> > >[image:
> > Twitter] [image: LinkedIn]
> > [image: Slack]
> > [image: YouTube]
> > 
> > [image: Kafka Summit] 
> >
> >
> > On Mon, May 17, 2021 at 8:03 PM Michał Łowicki 
> wrote:
> >
> > > Hey,
> > >
> > > Trying to understand how *bootstrap.servers* is handled for
> > KafkaProducer.
> > > I see that it's processed during creating of producer (here
> > > <
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L414
> > > >)
> > > and later if client DNS lookup is set to "default" it's being resolved
> to
> > > only one IP address (here
> > > <
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/

Re: Is bootstrap.servers resolved only once?

2021-05-17 Thread Michał Łowicki
On Mon, May 17, 2021 at 2:25 PM Shilin Wu  wrote:

> Bootstrap servers are just used for initial connection, clients will get
> all server metadata from one of the bootstrap servers to discover the full
> cluster membership (which may change dynamically), this list does not have
> to contain the full set of servers (you may want more than one, though, in
> case a server that used for bootstrapping is down).
>
> Clients (producers or consumers) make use of all servers irrespective of
> which servers are specified in bootstrap.servers for bootstrapping.
>
> In case of cluster change (e.g. new ip addresses for new servers), the
> clients will receive updates from the server group.
>
> To answer your question:
> 1. Does it mean that if *bootstrap.servers* contains only one address which
> resolves to multiple IP addresses then still only one IP will be taken into
> account during bootstrap?
> => It shouldn't matter. As long as it resolves at least one functioning
> broker, it will work. Make sure the advertised listeners are configured
> correctly and are externally accessible.
> All actual server meta data are retrieved from the functioning broker and
> updated accordingly.
>

AFAIU it can matter in one (probably minor) scenario where KafkaProducer
will resolve DNS name into just one IP (first / random) and before
connecting to it to get metadata about the cluster the node would die and
no other IP would be tried out. If it would resolve to all then it could
try others. Currently KafkaProducer just gives up. This is easy though to
add logic to retry or so.


>
> 2. What happens during catastrophic scenario where all brokers die and
> after restart they got different IPs? Will ever *bootstrap.servers* be
> evaluated once again so the clients could re-connect to the cluster
> assuming that bootstrap.servers will resolve to new IPs?
> => If you set client.dns.lookup="use_all_dns_ips" in your client
> configuration, it will use all of the IP addresses returned by DNS, not
> just the first (or a random one).
>
> What if it is so all ip addresses are changed? and you still have same DNS,
> but points to completely new host ips?
>
> From
>
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> ,
> it seems that the address resolution for bootstrap server is done only once
> in the constructor. So clients may have to be restarted in this case.
> See line 413.
>
> However, if you maintain an active cluster in a controlled change process
> (not due to a fault), and restart the servers and change ip addresses one
> by one,
> it should be possible to change the ip address gradually as the update of
> metadata should happen in a timely manner.
>
> In short, don't kill them all and restart them at the same time with new
> addresses.
>
> After all, you can always add an extra layer of retry loop to re-create the
> producer with the same producer config, if Java dns cache TTL expires, it
> may pick up new addresses eventually. This may take long though.
>

Yeah, we had scenario that because of the human error the cluster on K8s
was wiped out, recreated fast but all IPs changed and consumers / producers
couldn't reconnect without restarting. This still can be patched in the app
by maybe checking for n consecutive send failures and recreate producer
then.


>
>
>
>
> [image: Confluent] 
> Wu Shilin
> Solution Architect
> +6581007012
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] [image: LinkedIn]
> [image: Slack]
> [image: YouTube]
> 
> [image: Kafka Summit] 
>
>
> On Mon, May 17, 2021 at 8:03 PM Michał Łowicki  wrote:
>
> > Hey,
> >
> > Trying to understand how *bootstrap.servers* is handled for
> KafkaProducer.
> > I see that it's processed during creating of producer (here
> > <
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L414
> > >)
> > and later if client DNS lookup is set to "default" it's being resolved to
> > only one IP address (here
> > <
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java#L72
> > >
> > ).
> >
> > 1. Does it mean that if *bootstrap.servers* contains only one address
> which
> > resolves to multiple IP addresses then still only one IP will be taken
> into
> > account during bootstrap?
> > 2. What happens during catastrophic scenario where all brokers die and
> > after restart they got different IPs? Will ever *bootstrap.servers* be
> > evaluated once again so the clients could re-connect to the cluster
> > assuming that bootstrap.servers will resolve to new IPs?

Re: Is bootstrap.servers resolved only once?

2021-05-17 Thread Shilin Wu
Bootstrap servers are just used for initial connection, clients will get
all server metadata from one of the bootstrap servers to discover the full
cluster membership (which may change dynamically), this list does not have
to contain the full set of servers (you may want more than one, though, in
case a server that used for bootstrapping is down).

Clients (producers or consumers) make use of all servers irrespective of
which servers are specified in bootstrap.servers for bootstrapping.

In case of cluster change (e.g. new ip addresses for new servers), the
clients will receive updates from the server group.

To answer your question:
1. Does it mean that if *bootstrap.servers* contains only one address which
resolves to multiple IP addresses then still only one IP will be taken into
account during bootstrap?
=> It shouldn't matter. As long as it resolves at least one functioning
broker, it will work. Make sure the advertised listeners are configured
correctly and are externally accessible.
All actual server meta data are retrieved from the functioning broker and
updated accordingly.

2. What happens during catastrophic scenario where all brokers die and
after restart they got different IPs? Will ever *bootstrap.servers* be
evaluated once again so the clients could re-connect to the cluster
assuming that bootstrap.servers will resolve to new IPs?
=> If you set client.dns.lookup="use_all_dns_ips" in your client
configuration, it will use all of the IP addresses returned by DNS, not
just the first (or a random one).

What if it is so all ip addresses are changed? and you still have same DNS,
but points to completely new host ips?

From
https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
,
it seems that the address resolution for bootstrap server is done only once
in the constructor. So clients may have to be restarted in this case.
See line 413.

However, if you maintain an active cluster in a controlled change process
(not due to a fault), and restart the servers and change ip addresses one
by one,
it should be possible to change the ip address gradually as the update of
metadata should happen in a timely manner.

In short, don't kill them all and restart them at the same time with new
addresses.

After all, you can always add an extra layer of retry loop to re-create the
producer with the same producer config, if Java dns cache TTL expires, it
may pick up new addresses eventually. This may take long though.





[image: Confluent] 
Wu Shilin
Solution Architect
+6581007012
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]

[image: Kafka Summit] 


On Mon, May 17, 2021 at 8:03 PM Michał Łowicki  wrote:

> Hey,
>
> Trying to understand how *bootstrap.servers* is handled for KafkaProducer.
> I see that it's processed during creating of producer (here
> <
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L414
> >)
> and later if client DNS lookup is set to "default" it's being resolved to
> only one IP address (here
> <
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java#L72
> >
> ).
>
> 1. Does it mean that if *bootstrap.servers* contains only one address which
> resolves to multiple IP addresses then still only one IP will be taken into
> account during bootstrap?
> 2. What happens during catastrophic scenario where all brokers die and
> after restart they got different IPs? Will ever *bootstrap.servers* be
> evaluated once again so the clients could re-connect to the cluster
> assuming that bootstrap.servers will resolve to new IPs?
>
> --
> BR,
> Michał Łowicki
>


Is bootstrap.servers resolved only once?

2021-05-17 Thread Michał Łowicki
Hey,

Trying to understand how *bootstrap.servers* is handled for KafkaProducer.
I see that it's processed during creating of producer (here
)
and later if client DNS lookup is set to "default" it's being resolved to
only one IP address (here

).

1. Does it mean that if *bootstrap.servers* contains only one address which
resolves to multiple IP addresses then still only one IP will be taken into
account during bootstrap?
2. What happens during catastrophic scenario where all brokers die and
after restart they got different IPs? Will ever *bootstrap.servers* be
evaluated once again so the clients could re-connect to the cluster
assuming that bootstrap.servers will resolve to new IPs?

-- 
BR,
Michał Łowicki


a zk-session-expiry issue

2021-05-17 Thread 熊 永鑫
Hi,

When we use Kafka, we find a strange issue:

Once Kafka has a zk-session-expiry exception , a large number of isr shrink, 
and zk request’s latency is high.

Metrics from jmx:
[cid:D3CD0A0D-58EA-4328-ADB3-F68F7ECEE962]
[cid:E68F4EA5-44F9-46EA-9F67-378FEDB7AAD1]

When we get broker’s jstack,we find  zk-session-expiry-handler0-SendThread 
continuous in broker’s process.

[cid:BE5BF7D6-49B6-4706-86E9-E03D3F0D4A9F]



Our kafka version is 2.2.0, we find a issue 
(https://issues.apache.org/jira/browse/KAFKA-8121) which may be related to 
this. Is this a fatal issue?