This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 434646febbaef0e0ff1edd2877036df2d51a2325
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Sep 8 15:35:53 2025 +0200

    MINIFICPP-2622 Replace Bitnami Kafka image with Apache in tests
    
    Closes #2023
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../integration/cluster/checkers/KafkaHelper.py    |  8 +--
 .../cluster/containers/KafkaBrokerContainer.py     | 68 +++++++++++++---------
 2 files changed, 44 insertions(+), 32 deletions(-)

diff --git a/docker/test/integration/cluster/checkers/KafkaHelper.py 
b/docker/test/integration/cluster/checkers/KafkaHelper.py
index a876e8bc0..0307e497b 100644
--- a/docker/test/integration/cluster/checkers/KafkaHelper.py
+++ b/docker/test/integration/cluster/checkers/KafkaHelper.py
@@ -26,19 +26,19 @@ class KafkaHelper:
 
     def create_topic(self, container_name: str, topic_name: str):
         logging.info(f"Creating topic {topic_name} in {container_name}")
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", 
f"/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic {topic_name} 
--bootstrap-server {container_name}:9092"])
+        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", 
f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} 
--bootstrap-server {container_name}:9092"])
         logging.info(output)
         return code == 0
 
     def produce_message(self, container_name: str, topic_name: str, message: 
str):
         logging.info(f"Sending {message} to {container_name}:{topic_name}")
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", 
f"/opt/bitnami/kafka/bin/kafka-console-producer.sh --topic {topic_name} 
--bootstrap-server {container_name}:9092 <<< '{message}'"])
+        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", 
f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} 
--bootstrap-server {container_name}:9092 <<< '{message}'"])
         logging.info(output)
         return code == 0
 
     def produce_message_with_key(self, container_name: str, topic_name: str, 
message: str, message_key: str):
         logging.info(f"Sending {message} to {container_name}:{topic_name}")
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", 
f"/opt/bitnami/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' 
--property 'parse.key=true' --topic {topic_name} --bootstrap-server 
{container_name}:9092 <<< '{message_key}:{message}'"])
+        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", 
f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' 
--property 'parse.key=true' --topic {topic_name} --bootstrap-server 
{container_name}:9092 <<< '{message_key}:{message}'"])
         logging.info(output)
         return code == 0
 
@@ -51,7 +51,7 @@ class KafkaHelper:
             RUN pip install confluent-kafka
             """
             dockerfile_stream = io.BytesIO(dockerfile_content.encode("utf-8"))
-            image, _ = 
self.container_communicator.client.images.build(fileobj=dockerfile_stream, 
tag="kafka-helper")
+            
self.container_communicator.client.images.build(fileobj=dockerfile_stream, 
tag="kafka-helper")
 
         output = 
self.container_communicator.client.containers.run("kafka-helper", ["python", 
"-c", command], remove=True, stdout=True, stderr=True, 
network=f"minifi_integration_test_network-{self.feature_id}")
         logging.info(output)
diff --git a/docker/test/integration/cluster/containers/KafkaBrokerContainer.py 
b/docker/test/integration/cluster/containers/KafkaBrokerContainer.py
index 2aacfd0ef..a597792d3 100644
--- a/docker/test/integration/cluster/containers/KafkaBrokerContainer.py
+++ b/docker/test/integration/cluster/containers/KafkaBrokerContainer.py
@@ -40,6 +40,12 @@ class KafkaBrokerContainer(Container):
 
         os.chmod(self.server_keystore_file_path, 0o644)
 
+        with tempfile.NamedTemporaryFile(delete=False) as credentials_file:
+            credentials_file.write(b"abcdefgh")
+            self.credentials_file_path = credentials_file.name
+
+        os.chmod(self.credentials_file_path, 0o644)
+
         trusted_cert = jks.TrustedCertEntry.new(
             'root-ca',  # Alias for the certificate
             crypto.dump_certificate(crypto.FILETYPE_ASN1, 
feature_context.root_ca_cert)
@@ -82,69 +88,75 @@ Client {
 
         logging.info('Creating and running kafka broker docker container...')
         self.client.containers.run(
-            image="bitnamilegacy/kafka:3.9.0",
+            image="apache/kafka:4.1.0",
             detach=True,
             name=self.name,
             network=self.network.name,
             environment=[
-                "KAFKA_CFG_NODE_ID=1",
-                "KAFKA_CFG_PROCESS_ROLES=controller,broker",
-                "KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT",
-                "KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER",
+                "KAFKA_NODE_ID=1",
+                "KAFKA_PROCESS_ROLES=controller,broker",
+                "KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT",
+                "KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER",
+                "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1",
+                "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
+                "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
 
-                
f"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-{self.feature_context.id}:9096",
+                
f"KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-{self.feature_context.id}:9096",
 
-                
f"KAFKA_CFG_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092,"
+                
f"KAFKA_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092,"
                 
f"SASL_PLAINTEXT://kafka-broker-{self.feature_context.id}:9094,"
                 f"SSL://kafka-broker-{self.feature_context.id}:9093,"
                 f"SASL_SSL://kafka-broker-{self.feature_context.id}:9095,"
                 f"CONTROLLER://kafka-broker-{self.feature_context.id}:9096",
 
-                
f"KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092,"
+                
f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092,"
                 
f"SASL_PLAINTEXT://kafka-broker-{self.feature_context.id}:9094,"
                 f"SSL://kafka-broker-{self.feature_context.id}:9093,"
                 f"SASL_SSL://kafka-broker-{self.feature_context.id}:9095,"
                 f"CONTROLLER://kafka-broker-{self.feature_context.id}:9096",
 
-                "KAFKA_CFG_LOG4J_ROOT_LOGLEVEL=DEBUG",
-                
"KAFKA_CFG_LOG4J_LOGGERS=kafka.controller=DEBUG,kafka.server.KafkaApis=DEBUG",
-
-                "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,"
+                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,"
                 "SASL_PLAINTEXT:SASL_PLAINTEXT,"
                 "SSL:SSL,"
                 "SASL_SSL:SASL_SSL,"
                 "CONTROLLER:PLAINTEXT",
 
                 # **If using SASL_PLAINTEXT, provide JAAS config**
-                'KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN',
-                'KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN',
-                
'KAFKA_OPTS=-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf',
-
-                "KAFKA_CFG_SSL_PROTOCOL=TLS",
-                "KAFKA_CFG_SSL_ENABLED_PROTOCOLS=TLSv1.2",
-                "KAFKA_CFG_SSL_KEYSTORE_TYPE=JKS",
-                
"KAFKA_CFG_SSL_KEYSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.keystore.jks",
-                "KAFKA_CFG_SSL_KEYSTORE_PASSWORD=abcdefgh",
-                "KAFKA_CFG_SSL_KEY_PASSWORD=abcdefgh",
-                "KAFKA_CFG_SSL_TRUSTSTORE_TYPE=JKS",
-                
"KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.truststore.jks",
-                "KAFKA_CFG_SSL_CLIENT_AUTH=none"
+                'KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN',
+                'KAFKA_SASL_ENABLED_MECHANISMS=PLAIN',
+                
'KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf 
-Dlog4j2.rootLogger.level=DEBUG 
-Dlog4j2.logger.org.apache.kafka.controller.level=DEBUG',
+
+                "KAFKA_SSL_PROTOCOL=TLS",
+                "KAFKA_SSL_ENABLED_PROTOCOLS=TLSv1.2",
+                "KAFKA_SSL_KEYSTORE_TYPE=JKS",
+                "KAFKA_SSL_KEYSTORE_FILENAME=kafka.keystore.jks",
+                "KAFKA_SSL_KEYSTORE_CREDENTIALS=credentials.conf",
+                "KAFKA_SSL_KEY_CREDENTIALS=credentials.conf",
+                "KAFKA_SSL_TRUSTSTORE_CREDENTIALS=credentials.conf",
+                "KAFKA_SSL_TRUSTSTORE_TYPE=JKS",
+                "KAFKA_SSL_TRUSTSTORE_FILENAME=kafka.truststore.jks",
+                "KAFKA_SSL_CLIENT_AUTH=none"
             ],
             mounts=[
                 docker.types.Mount(
                     type='bind',
                     source=self.server_keystore_file_path,
-                    target='/bitnami/kafka/config/certs/kafka.keystore.jks'
+                    target='/etc/kafka/secrets/kafka.keystore.jks'
                 ),
                 docker.types.Mount(
                     type='bind',
                     source=self.server_truststore_file_path,
-                    target='/bitnami/kafka/config/certs/kafka.truststore.jks'
+                    target='/etc/kafka/secrets/kafka.truststore.jks'
                 ),
                 docker.types.Mount(
                     type='bind',
                     source=self.jaas_config_file_path,
-                    target='/opt/bitnami/kafka/config/kafka_jaas.conf'
+                    target='/opt/kafka/config/kafka_jaas.conf'
+                ),
+                docker.types.Mount(
+                    type='bind',
+                    source=self.credentials_file_path,
+                    target='/etc/kafka/secrets/credentials.conf'
                 )
             ],
             entrypoint=self.command)

Reply via email to