Re: Kraft controller readiness checks

2024-04-21 Thread Dima Brodsky
Thanks Luke, this helps for our use case.  It does not cover the buildout
of a new cluster where there are no brokers, but that should be remedied by
kip 919 which looks to be resolved in 3.7.0.

ttyl
Dima


On Sun, Apr 21, 2024 at 9:06 PM Luke Chen  wrote:

> Hi Frank,
>
> About your question:
> > Unless this is already available but not well publicised in the
> documentation, ideally there should be protocol working on the controller
> ports that answers to operational questions like “are metadata partitions
> in sync?”, “has the current controller converged with other members of the
> quorum?”.
>
> I'm sorry that KRaft controller is using raft protocol, so there is no such
> "in-sync replica" definition like data replication protocol. What we did
> for our check is described here
> <
> https://github.com/strimzi/proposals/blob/main/060-kafka-roller-kraft.md#the-new-quorum-check
> >.
> In short, we use `controller.quorum.fetch.timeout.ms` and
> `replicaLastCaughtUpTimestamp` to determine if it's safe to roll this
> controller pod.
>
> Hope this helps.
>
> Thank you.
> Luke
>
>
>
>
> On Fri, Apr 19, 2024 at 5:06 PM Francesco Burato  >
> wrote:
>
> > Hi Luke,
> >
> > Thanks for the answers. I understand what you are describing in terms of
> > rationale for using just the availability of the controller port to
> > determine the readiness of the controller, but that is not fully
> satisfying
> > under an operational perspective, at least based on the lack of
> sufficient
> > documentation on the matter. Based on my understanding of kraft, which I
> > admit is not considerable, the controllers will host the cluster metadata
> > partitions on disk and make them available for the brokers. So,
> presumably,
> > one of the purposes of the controllers is to ensure that the metadata
> > partitions are properly replicated. Hence, what happens even in a non k8s
> > environment all controllers go down? What sort of outage does the wider
> > cluster experience in that circumstance?
> >
> > A complete outage on the controllers is of course an extreme scenario,
> but
> > a more likely one is that a disk of the controller goes offline and needs
> > to be replaced. In this scenario, the controller will have to
> re-construct
> > from scratch the cluster metadata from the other controllers in the
> quorum
> > but it presumably cannot participate to the quorum until the metadata
> > partitions are fully replicated. Based on this assumption, the mere
> > availability of the controller port does not necessarily mean that I can
> > safely shut down another controller because replication has not completed
> > yet.
> >
> > As I mentioned earlier, I don’t know the details of kraft in sufficient
> > details to evaluate if my assumptions are warranted, but the official
> > documentation does not seem to go in much detail on how to safely
> operate a
> > cluster in kraft mode while it provides very good information on how to
> > safely operate a ZK cluster by highlighting that the URP and leader
> > elections must be kept under control during restarts.
> >
> > Unless this is already available but not well publicised in the
> > documentation, ideally there should be protocol working on the controller
> > ports that answers to operational questions like “are metadata partitions
> > in sync?”, “has the current controller converged with other members of
> the
> > quorum?”.
> >
> > Goes without saying that if any of these topics are properly covered
> > anywhere in the docs, more than happy to be RTFMed to the right place.
> >
> > As for the other points you raise: we have a very particular set-up for
> > our kafka clusters that makes the circumstance you highlight not a
> problem.
> > In particular, our consumer and producers are all internal in a namespace
> > and can connect to non-ready brokers. Given the URP script checks for the
> > global URP state rather than just the URP state for the individual
> broker,
> > that means that as long as even one broker is marked as ready, that means
> > the entire cluster is safe. With the ordered rotation imposed by
> > statefulset parallel rolling restart, together with the URP readiness
> check
> > and the PDB, we are guaranteed not to cause any problem read or write
> > errors. Rotations are rather long, but we don’t really care about speed.
> >
> > Thanks,
> >
> > Frank
> >
> > --
> > Francesco Burato | Software Development Engineer | Adobe |
> > bur...@adobe.com  | c. +44 747
> > 9029370
> >
> >
> > From: Luke Chen 
> > Date: Friday, 19 April 2024 at 05:21
> > To: users@kafka.apache.org 
> > Subject: Re: Kraft controller readiness checks
> > EXTERNAL: Use caution when clicking on links or opening attachments.
> >
> >
> > Hello Frank,
> >
> > That's a good question.
> > I think we all know there is no "correct" answer for this question. But I
> > can share with you what our team did for it.
> >
> > Readiness: controller is listening on the controller.listener.names
> 

Re: Kraft controller readiness checks

2024-04-21 Thread Luke Chen
Hi Frank,

About your question:
> Unless this is already available but not well publicised in the
documentation, ideally there should be protocol working on the controller
ports that answers to operational questions like “are metadata partitions
in sync?”, “has the current controller converged with other members of the
quorum?”.

I'm sorry that KRaft controller is using raft protocol, so there is no such
"in-sync replica" definition like data replication protocol. What we did
for our check is described here
.
In short, we use `controller.quorum.fetch.timeout.ms` and
`replicaLastCaughtUpTimestamp` to determine if it's safe to roll this
controller pod.

Hope this helps.

Thank you.
Luke




On Fri, Apr 19, 2024 at 5:06 PM Francesco Burato 
wrote:

> Hi Luke,
>
> Thanks for the answers. I understand what you are describing in terms of
> rationale for using just the availability of the controller port to
> determine the readiness of the controller, but that is not fully satisfying
> under an operational perspective, at least based on the lack of sufficient
> documentation on the matter. Based on my understanding of kraft, which I
> admit is not considerable, the controllers will host the cluster metadata
> partitions on disk and make them available for the brokers. So, presumably,
> one of the purposes of the controllers is to ensure that the metadata
> partitions are properly replicated. Hence, what happens even in a non k8s
> environment all controllers go down? What sort of outage does the wider
> cluster experience in that circumstance?
>
> A complete outage on the controllers is of course an extreme scenario, but
> a more likely one is that a disk of the controller goes offline and needs
> to be replaced. In this scenario, the controller will have to re-construct
> from scratch the cluster metadata from the other controllers in the quorum
> but it presumably cannot participate to the quorum until the metadata
> partitions are fully replicated. Based on this assumption, the mere
> availability of the controller port does not necessarily mean that I can
> safely shut down another controller because replication has not completed
> yet.
>
> As I mentioned earlier, I don’t know the details of kraft in sufficient
> details to evaluate if my assumptions are warranted, but the official
> documentation does not seem to go in much detail on how to safely operate a
> cluster in kraft mode while it provides very good information on how to
> safely operate a ZK cluster by highlighting that the URP and leader
> elections must be kept under control during restarts.
>
> Unless this is already available but not well publicised in the
> documentation, ideally there should be protocol working on the controller
> ports that answers to operational questions like “are metadata partitions
> in sync?”, “has the current controller converged with other members of the
> quorum?”.
>
> Goes without saying that if any of these topics are properly covered
> anywhere in the docs, more than happy to be RTFMed to the right place.
>
> As for the other points you raise: we have a very particular set-up for
> our kafka clusters that makes the circumstance you highlight not a problem.
> In particular, our consumer and producers are all internal in a namespace
> and can connect to non-ready brokers. Given the URP script checks for the
> global URP state rather than just the URP state for the individual broker,
> that means that as long as even one broker is marked as ready, that means
> the entire cluster is safe. With the ordered rotation imposed by
> statefulset parallel rolling restart, together with the URP readiness check
> and the PDB, we are guaranteed not to cause any problem read or write
> errors. Rotations are rather long, but we don’t really care about speed.
>
> Thanks,
>
> Frank
>
> --
> Francesco Burato | Software Development Engineer | Adobe |
> bur...@adobe.com  | c. +44 747
> 9029370
>
>
> From: Luke Chen 
> Date: Friday, 19 April 2024 at 05:21
> To: users@kafka.apache.org 
> Subject: Re: Kraft controller readiness checks
> EXTERNAL: Use caution when clicking on links or opening attachments.
>
>
> Hello Frank,
>
> That's a good question.
> I think we all know there is no "correct" answer for this question. But I
> can share with you what our team did for it.
>
> Readiness: controller is listening on the controller.listener.names
>
> The rationale behind it is:
> 1. The last step for the controller node startup is to wait until all the
> SocketServer ports to be open, and the Acceptors to be started, and the
> controller port is one of them.
> 2. This controller listener is used to talk to other controllers (voters)
> to form the raft quorum, so if it is not open and listening, the controller
> is basically not working at all.
> 3. The controller listener is also used for brokers (observers) to get the
> 

Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-04-21 Thread Matthias J. Sax
Not sure either, but it sounds like a bug to me. Can you reproduce this 
reliably? What version are you using?


It would be best if you could file a Jira ticket and we can take it from 
there.



-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
  Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
   at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
   at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
   at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
   at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
   at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
  Caused by: java.lang.NullPointerException
   at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
   at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
   ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
   streams.setUncaughtExceptionHandler(throwable -> {
   LOGGER.error("Exception in streams", throwable);
   return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
   });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.


Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-04-21 Thread Penumarthi Durga Prasad Chowdary
Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
 Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
  at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
  at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
  at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
  at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
  at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
  at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
  at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 Caused by: java.lang.NullPointerException
  at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
  at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
  ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
  streams.setUncaughtExceptionHandler(throwable -> {
  LOGGER.error("Exception in streams", throwable);
  return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
  });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.
-- 


Thank's's,
Prasad,
91-9030546248.


SQL Client fails with ClassNotFoundException when Hive dialect is chosen

2024-04-21 Thread Ilya Karpov
Hi,

I have *flink 1.19 *installation and start *sql-client locally*, like this:
*./bin/sql-client.sh*
then execute *SET 'table.sql-dialect' = 'hive'*;
and have this exception:


*[ERROR] Could not execute SQL statement.
Reason:java.lang.ClassNotFoundException: org.antlr.runtime.tree.Tree*

I'm pretty sure that* i do have antlr in classpath*, because i found it
there using `ps` command (see text in *bold*):
ps -Af | grep java | grep client | grep antlr
  501  1131   993   0 11:30PM ttys0000:03.27 java
-XX:+IgnoreUnrecognizedVMOptions
--add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
-Dlog.file=/Users/ilyakarpov/Downloads/flink-1.19.0/log/flink-ilyakarpov-sql-client-192.168.1.8.log
-Dlog4j.configuration=file:/Users/ilyakarpov/Downloads/flink-1.19.0/conf/log4j-cli.properties
-Dlog4j.configurationFile=file:/Users/ilyakarpov/Downloads/flink-1.19.0/conf/log4j-cli.properties
-Dlogback.configurationFile=file:/Users/ilyakarpov/Downloads/flink-1.19.0/conf/logback.xml
-classpath
*/Users/ilyakarpov/Downloads/flink-1.19.0/lib/antlr-runtime-3.5.2.jar*:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-cep-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-connector-files-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-connector-hive_2.12-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-csv-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-json-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-scala_2.12-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-table-api-java-uber-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-table-planner-loader-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-table-runtime-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/hive-exec-3.1.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/libfb303-0.9.3.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/log4j-1.2-api-2.17.1.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/log4j-api-2.17.1.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/log4j-core-2.17.1.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/log4j-slf4j-impl-2.17.1.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/lib/flink-dist-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/opt/flink-python-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/opt/flink-sql-gateway-1.19.0.jar:/Users/ilyakarpov/Downloads/flink-1.19.0/opt/flink-sql-client-1.19.0.jar:::
org.apache.flink.table.client.SqlClient --jar
/Users/ilyakarpov/Downloads/flink-1.19.0/opt/flink-sql-client-1.19.0.jar

i double checked my lib folder:
ll ~/Downloads/flink-1.19.0/lib
total 502784
drwxr-xr-x@ 19 ilyakarpov  staff608 Apr 21 23:09 .
drwxr-xr-x@ 13 ilyakarpov  staff416 Mar  7 08:49 ..
*-rw-r--r--@  1 ilyakarpov  staff 167761 Apr 21 23:06
antlr-runtime-3.5.2.jar*
-rw-r--r--@  1 ilyakarpov  staff 198148 Mar  7 08:35
flink-cep-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff 558298 Mar  7 08:38
flink-connector-files-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff8945307 Apr 21 23:05
flink-connector-hive_2.12-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff 102375 Mar  7 08:42
flink-csv-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff  125658127 Mar  7 08:49
flink-dist-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff 202905 Mar  7 08:42
flink-json-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff   21060379 Mar  7 08:47
flink-scala_2.12-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff   15621700 Mar  7 08:48
flink-table-api-java-uber-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff   38130033 Mar  7 08:47
flink-table-planner-loader-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff3514238 Mar  7 08:36
flink-table-runtime-1.19.0.jar
-rw-r--r--@  1 ilyakarpov  staff   40603464 Apr 21 23:09 hive-exec-3.1.0.jar
-rw-r--r--@  1 ilyakarpov  staff 313702 Apr 21 23:08 libfb303-0.9.3.jar
-rw-r--r--@  1 ilyakarpov  staff 208006 Mar  4 18:46
log4j-1.2-api-2.17.1.jar
-rw-r--r--@  1 ilyakarpov  staff 301872 Mar  4 18:46