This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 434646febbaef0e0ff1edd2877036df2d51a2325 Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Sep 8 15:35:53 2025 +0200 MINIFICPP-2622 Replace Bitnami Kafka image with Apache in tests Closes #2023 Signed-off-by: Marton Szasz <[email protected]> --- .../integration/cluster/checkers/KafkaHelper.py | 8 +-- .../cluster/containers/KafkaBrokerContainer.py | 68 +++++++++++++--------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/docker/test/integration/cluster/checkers/KafkaHelper.py b/docker/test/integration/cluster/checkers/KafkaHelper.py index a876e8bc0..0307e497b 100644 --- a/docker/test/integration/cluster/checkers/KafkaHelper.py +++ b/docker/test/integration/cluster/checkers/KafkaHelper.py @@ -26,19 +26,19 @@ class KafkaHelper: def create_topic(self, container_name: str, topic_name: str): logging.info(f"Creating topic {topic_name} in {container_name}") - (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {container_name}:9092"]) + (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {container_name}:9092"]) logging.info(output) return code == 0 def produce_message(self, container_name: str, topic_name: str, message: str): logging.info(f"Sending {message} to {container_name}:{topic_name}") - (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/bitnami/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {container_name}:9092 <<< '{message}'"]) + (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {container_name}:9092 <<< '{message}'"]) logging.info(output) return code == 0 def produce_message_with_key(self, container_name: str, topic_name: str, message: str, message_key: str): logging.info(f"Sending {message} to {container_name}:{topic_name}") - (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/bitnami/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {container_name}:9092 <<< '{message_key}:{message}'"]) + (code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {container_name}:9092 <<< '{message_key}:{message}'"]) logging.info(output) return code == 0 @@ -51,7 +51,7 @@ class KafkaHelper: RUN pip install confluent-kafka """ dockerfile_stream = io.BytesIO(dockerfile_content.encode("utf-8")) - image, _ = self.container_communicator.client.images.build(fileobj=dockerfile_stream, tag="kafka-helper") + self.container_communicator.client.images.build(fileobj=dockerfile_stream, tag="kafka-helper") output = self.container_communicator.client.containers.run("kafka-helper", ["python", "-c", command], remove=True, stdout=True, stderr=True, network=f"minifi_integration_test_network-{self.feature_id}") logging.info(output) diff --git a/docker/test/integration/cluster/containers/KafkaBrokerContainer.py b/docker/test/integration/cluster/containers/KafkaBrokerContainer.py index 2aacfd0ef..a597792d3 100644 --- a/docker/test/integration/cluster/containers/KafkaBrokerContainer.py +++ b/docker/test/integration/cluster/containers/KafkaBrokerContainer.py @@ -40,6 +40,12 @@ class KafkaBrokerContainer(Container): os.chmod(self.server_keystore_file_path, 0o644) + with tempfile.NamedTemporaryFile(delete=False) as credentials_file: + credentials_file.write(b"abcdefgh") + self.credentials_file_path = credentials_file.name + + os.chmod(self.credentials_file_path, 0o644) + trusted_cert = jks.TrustedCertEntry.new( 'root-ca', # Alias for the certificate crypto.dump_certificate(crypto.FILETYPE_ASN1, feature_context.root_ca_cert) @@ -82,69 +88,75 @@ Client { logging.info('Creating and running kafka broker docker container...') self.client.containers.run( - image="bitnamilegacy/kafka:3.9.0", + image="apache/kafka:4.1.0", detach=True, name=self.name, network=self.network.name, environment=[ - "KAFKA_CFG_NODE_ID=1", - "KAFKA_CFG_PROCESS_ROLES=controller,broker", - "KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT", - "KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER", + "KAFKA_NODE_ID=1", + "KAFKA_PROCESS_ROLES=controller,broker", + "KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT", + "KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1", - f"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-{self.feature_context.id}:9096", + f"KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-{self.feature_context.id}:9096", - f"KAFKA_CFG_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092," + f"KAFKA_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092," f"SASL_PLAINTEXT://kafka-broker-{self.feature_context.id}:9094," f"SSL://kafka-broker-{self.feature_context.id}:9093," f"SASL_SSL://kafka-broker-{self.feature_context.id}:9095," f"CONTROLLER://kafka-broker-{self.feature_context.id}:9096", - f"KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092," + f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092," f"SASL_PLAINTEXT://kafka-broker-{self.feature_context.id}:9094," f"SSL://kafka-broker-{self.feature_context.id}:9093," f"SASL_SSL://kafka-broker-{self.feature_context.id}:9095," f"CONTROLLER://kafka-broker-{self.feature_context.id}:9096", - "KAFKA_CFG_LOG4J_ROOT_LOGLEVEL=DEBUG", - "KAFKA_CFG_LOG4J_LOGGERS=kafka.controller=DEBUG,kafka.server.KafkaApis=DEBUG", - - "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT," + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT," "SASL_PLAINTEXT:SASL_PLAINTEXT," "SSL:SSL," "SASL_SSL:SASL_SSL," "CONTROLLER:PLAINTEXT", # **If using SASL_PLAINTEXT, provide JAAS config** - 'KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN', - 'KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN', - 'KAFKA_OPTS=-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf', - - "KAFKA_CFG_SSL_PROTOCOL=TLS", - "KAFKA_CFG_SSL_ENABLED_PROTOCOLS=TLSv1.2", - "KAFKA_CFG_SSL_KEYSTORE_TYPE=JKS", - "KAFKA_CFG_SSL_KEYSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.keystore.jks", - "KAFKA_CFG_SSL_KEYSTORE_PASSWORD=abcdefgh", - "KAFKA_CFG_SSL_KEY_PASSWORD=abcdefgh", - "KAFKA_CFG_SSL_TRUSTSTORE_TYPE=JKS", - "KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.truststore.jks", - "KAFKA_CFG_SSL_CLIENT_AUTH=none" + 'KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN', + 'KAFKA_SASL_ENABLED_MECHANISMS=PLAIN', + 'KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf -Dlog4j2.rootLogger.level=DEBUG -Dlog4j2.logger.org.apache.kafka.controller.level=DEBUG', + + "KAFKA_SSL_PROTOCOL=TLS", + "KAFKA_SSL_ENABLED_PROTOCOLS=TLSv1.2", + "KAFKA_SSL_KEYSTORE_TYPE=JKS", + "KAFKA_SSL_KEYSTORE_FILENAME=kafka.keystore.jks", + "KAFKA_SSL_KEYSTORE_CREDENTIALS=credentials.conf", + "KAFKA_SSL_KEY_CREDENTIALS=credentials.conf", + "KAFKA_SSL_TRUSTSTORE_CREDENTIALS=credentials.conf", + "KAFKA_SSL_TRUSTSTORE_TYPE=JKS", + "KAFKA_SSL_TRUSTSTORE_FILENAME=kafka.truststore.jks", + "KAFKA_SSL_CLIENT_AUTH=none" ], mounts=[ docker.types.Mount( type='bind', source=self.server_keystore_file_path, - target='/bitnami/kafka/config/certs/kafka.keystore.jks' + target='/etc/kafka/secrets/kafka.keystore.jks' ), docker.types.Mount( type='bind', source=self.server_truststore_file_path, - target='/bitnami/kafka/config/certs/kafka.truststore.jks' + target='/etc/kafka/secrets/kafka.truststore.jks' ), docker.types.Mount( type='bind', source=self.jaas_config_file_path, - target='/opt/bitnami/kafka/config/kafka_jaas.conf' + target='/opt/kafka/config/kafka_jaas.conf' + ), + docker.types.Mount( + type='bind', + source=self.credentials_file_path, + target='/etc/kafka/secrets/credentials.conf' ) ], entrypoint=self.command)
