Re: Switchin from Zookepper to Kafka KRaft mode / Using ACLs with Kafka KRaft mode

2022-05-16 Thread Thomas Cooper
Hi Florian,

Switching from a Zookeeper based cluster to a KRaft based one is not
currently supported. AFAIK that functionality should be coming in Kafka
3.4 (or possibly later).

Cheers,

Tom

On 16/05/2022 12:42, Florian Blumenstein wrote:
> Hi guys,
>
> I currently try to switch from Kafka 3.1.0 with ZooKeeper to Kafka 3.2.0 with 
> Kafka Kraft mode. I adjusted the server.properties as follows:
>
> ### KRaft-properties
> process.roles=broker,controller
> node.id=1
> controller.quorum.voters=1@127.0.0.1:9091
> controller.listener.names=CONTROLLER
>
> auto.create.topics.enable=false
> ssl.client.auth=required
>
> ### Enable ACLs
> authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
> allow.everyone.if.no.acl.found=false
>
> # Topics and indexes are stored here to keep track of records sent via broker
> log.dir=/opt/kafka/data/
>
> # Internal Topic Settings  
> #
> # The replication factor for the group metadata internal topics 
> "__consumer_offsets" and "__transaction_state"
> # For anything other than development testing, a value greater than 1 is 
> recommended for to ensure availability such as 3.
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
>
> ### Platform Configured Entries --- Below here entries are configured by the 
> platform
> listener.name.docker.ssl.keystore.location=/app/ssl/internalKeystore.jks
> super.users=User:Applications:0765df41-0b31-4db8-8849-c9d77e9c6e20;User:CN=onlinesuiteplus-kafka,OU=Services,O=Company
>  AG,L=City,C=DE
> advertised.listeners=DEVIN://onlinesuiteplus-kafka:29092,DEVOUT://localhost:9092,DOCKER://onlinesuiteplus-kafka:29093,EXTERNAL://localhost:9093
> listener.name.docker.ssl.key.password=password
> inter.broker.listener.name=DOCKER
> listener.name.external.ssl.key.password=password
> listener.name.external.ssl.truststore.password=password
> ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=Applications.*$/Applications:$1/,RULE:^CN=(.*?),OU=Devices.*$/Devices:$1/,DEFAULT
> initial.start=true
> listener.name.docker.ssl.truststore.location=/app/ssl/truststore.jks
> listener.name.external.ssl.keystore.password=password
> listeners=CONTROLLER://:9091,DEVIN://:29092,DEVOUT://:9092,DOCKER://:29093,EXTERNAL://:9093
> listener.name.external.ssl.truststore.location=/app/ssl/truststore.jks
> listener.name.docker.ssl.truststore.password=password
> listener.name.external.ssl.keystore.location=/app/ssl/externalKeystore.jks
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,DEVIN:PLAINTEXT,DEVOUT:PLAINTEXT,DOCKER:SSL,EXTERNAL:SSL
> listener.name.docker.ssl.keystore.password=password
>
> If I now run kafka with the following script:
>
> if [ "$KAFKA_INITIAL_START" == "true" ]
> then
>  echo "Running kafka-storage.sh because env var KAFKA_INITIAL_START was 
> set to true"
>  "${KAFKA_HOME}"/bin/kafka-storage.sh format --config 
> "${KAFKA_HOME}"/config/server.properties --cluster-id 
> $("${KAFKA_HOME}"/bin/kafka-storage.sh random-uuid)
> fi
>
> exec "$KAFKA_HOME/bin/kafka-server-start.sh" 
> "$KAFKA_HOME/config/server.properties"
>
>
> I got the following logs:
>
> [2022-05-16 11:25:08,894] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-05-16 11:25:09,220] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-05-16 11:25:09,473] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/opt/kafka/data] Loading producer state till offset 0 with message format 
> version 2 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,474] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/opt/kafka/data] Reloading from producer snapshot and rebuilding producer 
> state from offset 0 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,477] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/opt/kafka/data] Producer state recovery took 2ms for snapshot load and 
> 0ms for segment recovery from offset 0 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,584] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-05-16 11:25:09,784] INFO [RaftManager nodeId=1] Completed transition to 
> Unattached(epoch=0, voters=[1], electionTimeoutMs=1442) 
> (org.apache.kafka.raft.QuorumState)
> [2022-05-16 11:25:09,797] INFO [RaftManager nodeId=1] Completed transition to 
> CandidateState(localId=1, epoch=1, retries=1, electionTimeoutMs=1741) 
> (org.apache.kafka.raft.QuorumState)
> [2022-05-16 11:25:09,810] INFO [RaftManager nodeId=1] Completed transition to 
> Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, 
> voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, 
> lastFetchTimestamp=OptionalLong.empty, hasAcknowledgedLeader=true)}) 
> (org.apache.kafka.raft.QuorumState)
> [2022-05-16 11:25

Re: Failed to fetch offset for partition ...: There are unstable offsets that need to be cleared

2022-05-16 Thread Domantas Petrauskas
Sorry for duplicate thread, email got delayed for very long. Original thread 
with more details - 
https://lists.apache.org/thread/36x0gyrpgm2c4yr0qq2hmmngsp8ym2qs 
.

> On 2022-05-13, at 16:08, Domantas Petrauskas  
> wrote:
> 
> Re-creating consumer group has helped. Still not clear what is the cause.
> 
>> On 2022-05-13, at 11:44, Domantas Petrauskas > > wrote:
>> 
>> Hello,
>> 
>> I am getting this error in my Kafka consumer:
>> 
>> The following partitions still have unstable offsets which are not cleared 
>> on the broker side: [...], this could be either transactional offsets 
>> waiting for completion, or normal offsets waiting for replication after 
>> appending to local 
>> log\n","stream":"stdout","time":"2022-05-13T08:36:52.691586877Z"}
>> 
>> I tried to find some details on how to fix this issue, but could not find 
>> anything, so I tried to reset the offset but that did not help. What are the 
>> steps to troubleshoot this issue?
>> 
>> Best regards,
>> 
>> Domantas Petrauskas
> 



Switchin from Zookepper to Kafka KRaft mode / Using ACLs with Kafka KRaft mode

2022-05-16 Thread Florian Blumenstein
Hi guys,

I currently try to switch from Kafka 3.1.0 with ZooKeeper to Kafka 3.2.0 with 
Kafka Kraft mode. I adjusted the server.properties as follows:

### KRaft-properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@127.0.0.1:9091
controller.listener.names=CONTROLLER

auto.create.topics.enable=false
ssl.client.auth=required

### Enable ACLs
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=false

# Topics and indexes are stored here to keep track of records sent via broker
log.dir=/opt/kafka/data/

# Internal Topic Settings  
#
# The replication factor for the group metadata internal topics 
"__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is 
recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

### Platform Configured Entries --- Below here entries are configured by the 
platform
listener.name.docker.ssl.keystore.location=/app/ssl/internalKeystore.jks
super.users=User:Applications:0765df41-0b31-4db8-8849-c9d77e9c6e20;User:CN=onlinesuiteplus-kafka,OU=Services,O=Company
 AG,L=City,C=DE
advertised.listeners=DEVIN://onlinesuiteplus-kafka:29092,DEVOUT://localhost:9092,DOCKER://onlinesuiteplus-kafka:29093,EXTERNAL://localhost:9093
listener.name.docker.ssl.key.password=password
inter.broker.listener.name=DOCKER
listener.name.external.ssl.key.password=password
listener.name.external.ssl.truststore.password=password
ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=Applications.*$/Applications:$1/,RULE:^CN=(.*?),OU=Devices.*$/Devices:$1/,DEFAULT
initial.start=true
listener.name.docker.ssl.truststore.location=/app/ssl/truststore.jks
listener.name.external.ssl.keystore.password=password
listeners=CONTROLLER://:9091,DEVIN://:29092,DEVOUT://:9092,DOCKER://:29093,EXTERNAL://:9093
listener.name.external.ssl.truststore.location=/app/ssl/truststore.jks
listener.name.docker.ssl.truststore.password=password
listener.name.external.ssl.keystore.location=/app/ssl/externalKeystore.jks
listener.security.protocol.map=CONTROLLER:PLAINTEXT,DEVIN:PLAINTEXT,DEVOUT:PLAINTEXT,DOCKER:SSL,EXTERNAL:SSL
listener.name.docker.ssl.keystore.password=password

If I now run kafka with the following script:

if [ "$KAFKA_INITIAL_START" == "true" ]
then
echo "Running kafka-storage.sh because env var KAFKA_INITIAL_START was set 
to true"
"${KAFKA_HOME}"/bin/kafka-storage.sh format --config 
"${KAFKA_HOME}"/config/server.properties --cluster-id 
$("${KAFKA_HOME}"/bin/kafka-storage.sh random-uuid)
fi

exec "$KAFKA_HOME/bin/kafka-server-start.sh" 
"$KAFKA_HOME/config/server.properties"


I got the following logs:

[2022-05-16 11:25:08,894] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2022-05-16 11:25:09,220] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
[2022-05-16 11:25:09,473] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/opt/kafka/data] Loading producer state till offset 0 with message format 
version 2 (kafka.log.UnifiedLog$)
[2022-05-16 11:25:09,474] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/opt/kafka/data] Reloading from producer snapshot and rebuilding producer 
state from offset 0 (kafka.log.UnifiedLog$)
[2022-05-16 11:25:09,477] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/opt/kafka/data] Producer state recovery took 2ms for snapshot load and 0ms 
for segment recovery from offset 0 (kafka.log.UnifiedLog$)
[2022-05-16 11:25:09,584] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2022-05-16 11:25:09,784] INFO [RaftManager nodeId=1] Completed transition to 
Unattached(epoch=0, voters=[1], electionTimeoutMs=1442) 
(org.apache.kafka.raft.QuorumState)
[2022-05-16 11:25:09,797] INFO [RaftManager nodeId=1] Completed transition to 
CandidateState(localId=1, epoch=1, retries=1, electionTimeoutMs=1741) 
(org.apache.kafka.raft.QuorumState)
[2022-05-16 11:25:09,810] INFO [RaftManager nodeId=1] Completed transition to 
Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, 
voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, 
lastFetchTimestamp=OptionalLong.empty, hasAcknowledgedLeader=true)}) 
(org.apache.kafka.raft.QuorumState)
[2022-05-16 11:25:09,854] INFO Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)
[2022-05-16 11:25:09,860] INFO [kafka-raft-outbound-request-thread]: Starting 
(kafka.raft.RaftSendThread)
[2022-05-16 11:25:09,860] INFO [kafka-raft-io-thread]: Starting 
(kafka.raft.KafkaRaftManager$RaftIoThread)
[2022-05-16 11:25:09,862] INFO Starting controller 
(kafka.server.ControllerServer)
[2022-05-16