Yogesh created KAFKA-16201:
------------------------------

             Summary: Kafka exception - 
org.apache.kafka.common.errors.NotLeaderOrFollowerException
                 Key: KAFKA-16201
                 URL: https://issues.apache.org/jira/browse/KAFKA-16201
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 3.6.1
         Environment: AWS EKS
            Reporter: Yogesh


I am deploying Kafka inside Kubernetes cluster in HA mode (multiple brokers). 
The deployment consists of

Kubernetes
Kafka 3.6.1
Refer to the following files used in the deployment

Dockerfile

 
{code:java}
FROM eclipse-temurin:17.0.9_9-jdk-jammy

ENV KAFKA_VERSION=3.6.1
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin

LABEL name="kafka" version=${KAFKA_VERSION}

RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz 
https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
 \
 && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
 && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
 && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
 && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz

COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"] {code}
 

 

entrypoint.sh

 
{code:java}
#!/bin/bash

NODE_ID=${HOSTNAME:6}
LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"

ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"

CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
    if [[ $i != $REPLICAS ]]; then
        
CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
    else
        CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
    fi
done

mkdir -p $SHARE_DIR/$NODE_ID

if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
    CLUSTER_ID=$(kafka-storage.sh random-uuid)
    echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
    CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi

sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e 
"s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+"
 \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties

JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"

echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> 
/opt/kafka/config/kraft/server.properties
echo -e "\nsasl.enabled.mechanisms=PLAIN" >> 
/opt/kafka/config/kraft/server.properties
echo -e 
"\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"
 >> /opt/kafka/config/kraft/server.properties
echo -e "\ninter.broker.listener.name=INTERNAL" >> 
/opt/kafka/config/kraft/server.properties

kafka-storage.sh format -t $CLUSTER_ID -c 
/opt/kafka/config/kraft/server.properties

exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties {code}
 

 

Kafka.yaml
{code:java}
apiVersion: v1
kind: Namespace
metadata:
  name: kafka-kraft
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv-volume
  labels:
    type: local
spec:
  storageClassName: manual
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteOnce
  hostPath:
    path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-pv-claim
  namespace: kafka-kraft
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-app
  namespace: kafka-kraft
spec:
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
  selector:
    app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka-app
  namespace: kafka-kraft
spec:
  serviceName: kafka-svc
  replicas: 5
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      volumes:
        - name: kafka-storage
          persistentVolumeClaim:
            claimName: kafka-pv-claim
      containers:
        - name: kafka-container
          image: myimage/kafka-kraft:1.0
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: REPLICAS
              value: '5'
            - name: SERVICE
              value: kafka-svc
            - name: NAMESPACE
              value: kafka-kraft
            - name: SHARE_DIR
              value: /mnt/kafka
          volumeMounts:
            - name: kafka-storage
              mountPath: /mnt/kafka {code}
After the deployment all the containers are up and running. I then connect the 
broker using following command

 
{code:java}
.\kafka-topics.bat --bootstrap-server 
kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092 
--command-config producer.properties --topic hello --create 
--replication-factor 5 {code}
 

 

producer.properties

 
{code:java}
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username=admin password=admin-secret;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
metadata.max.age.ms=1000 {code}
 

 

A prompt is displayed to enter a messag. Upon a sample text it throws following 
error.

[Producer clientId=console-producer] Received invalid metadata error in produce 
request on partition hello2-1 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now

 

What I have tried so far
 * Tried zookeeper and kraft mode
 * Tried deleting and recreating the topics (This works randomly)


Unfortunately, the problem persists and not able to produce messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to