Copilot commented on code in PR #2059:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2059#discussion_r2533884357


##########
behave_framework/src/minifi_test_framework/containers/minifi_container.py:
##########
@@ -15,34 +15,44 @@
 #  limitations under the License.
 #
 
-from docker.models.networks import Network
+import logging
+from OpenSSL import crypto
 
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
 from minifi_test_framework.containers.file import File
 from minifi_test_framework.minifi.flow_definition import FlowDefinition
+from minifi_test_framework.core.ssl_utils import 
make_cert_without_extended_usage
 from .container import Container
 
 
 class MinifiContainer(Container):
-    def __init__(self, image_name: str, container_name: str, scenario_id: str, 
network: Network):
-        super().__init__(image_name, f"{container_name}-{scenario_id}", 
network)
+    def __init__(self, container_name: str, test_context: MinifiTestContext):
+        super().__init__(test_context.minifi_container_image, 
f"{container_name}-{test_context.scenario_id}", test_context.network)
         self.flow_config_str: str = ""
         self.flow_definition = FlowDefinition()
         self.properties: dict[str, str] = {}
         self.log_properties: dict[str, str] = {}
 
-        self.is_fhs = 'MINIFI_INSTALLATION_TYPE=FHS' in 
str(self.client.images.get(image_name).history())
+        minifi_client_cert, minifi_client_key = 
make_cert_without_extended_usage(common_name=self.container_name, 
ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
+        self.files.append(File("/tmp/resources/root_ca.crt", 
crypto.dump_certificate(type=crypto.FILETYPE_PEM, 
cert=test_context.root_ca_cert)))
+        self.files.append(File("/tmp/resources/minifi_client.crt", 
crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_client_cert)))
+        self.files.append(File("/tmp/resources/minifi_client.key", 
crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_client_key)))

Review Comment:
   The `File` class is now initialized with `permissions` parameter but there's 
inconsistent usage. Some files use `permissions=0o644` while others don't 
specify permissions at all. For consistency and security, all files should 
explicitly specify appropriate permissions, especially for sensitive files like 
certificates and keys which should typically be `0o600` or `0o400` to prevent 
unauthorized access.



##########
behave_framework/src/minifi_test_framework/steps/flow_building_steps.py:
##########
@@ -77,7 +78,10 @@ def step_impl(context: MinifiTestContext, processor_type: 
str):
 @step('the "{property_name}" property of the {processor_name} processor is set 
to "{property_value}"')
 def step_impl(context: MinifiTestContext, property_name: str, processor_name: 
str, property_value: str):
     processor = 
context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name)
-    processor.add_property(property_name, property_value)
+    if property_value == "(not set)":
+        processor.remove_property(property_name)
+    else:
+        processor.add_property(property_name, property_value)

Review Comment:
   [nitpick] The special handling for `"(not set)"` property values is added, 
but the logic may not be clear. When `property_value == "(not set)"`, the 
property is removed. However, this string literal comparison could be fragile. 
Consider documenting this behavior or using a more explicit constant (e.g., 
`PROPERTY_NOT_SET = "(not set)"`) to make the intent clearer and avoid 
accidental matches.



##########
extensions/kafka/tests/features/steps/kafka_server_container.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+import jks
+
+from OpenSSL import crypto
+from minifi_test_framework.core.helpers import wait_for_condition
+from minifi_test_framework.core.ssl_utils import make_server_cert
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.containers.file import File
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+
+
+class KafkaServer(Container):
+    def __init__(self, test_context: MinifiTestContext):
+        super().__init__("apache/kafka:4.1.0", 
f"kafka-server-{test_context.scenario_id}", test_context.network)
+        self.environment.append("KAFKA_NODE_ID=1")
+        self.environment.append("KAFKA_PROCESS_ROLES=controller,broker")
+        self.environment.append("KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT")
+        self.environment.append("KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER")
+        self.environment.append("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1")
+        
self.environment.append("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1")
+        self.environment.append("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1")
+        
self.environment.append(f"KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka-server-{test_context.scenario_id}:9096")
+        
self.environment.append(f"KAFKA_LISTENERS=PLAINTEXT://kafka-server-{test_context.scenario_id}:9092,SASL_PLAINTEXT://kafka-server-{test_context.scenario_id}:9094,SSL://kafka-server-{test_context.scenario_id}:9093,SASL_SSL://kafka-server-{test_context.scenario_id}:9095,CONTROLLER://kafka-server-{test_context.scenario_id}:9096")
+        
self.environment.append(f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server-{test_context.scenario_id}:9092,SASL_PLAINTEXT://kafka-server-{test_context.scenario_id}:9094,SSL://kafka-server-{test_context.scenario_id}:9093,SASL_SSL://kafka-server-{test_context.scenario_id}:9095,CONTROLLER://kafka-server-{test_context.scenario_id}:9096")
+        
self.environment.append("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,CONTROLLER:PLAINTEXT")
+        
self.environment.append("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN")
+        self.environment.append("KAFKA_SASL_ENABLED_MECHANISMS=PLAIN")
+        
self.environment.append("KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf
 -Dlog4j2.rootLogger.level=DEBUG 
-Dlog4j2.logger.org.apache.kafka.controller.level=DEBUG")
+        self.environment.append("KAFKA_SSL_PROTOCOL=TLS")
+        self.environment.append("KAFKA_SSL_ENABLED_PROTOCOLS=TLSv1.2")
+        self.environment.append("KAFKA_SSL_KEYSTORE_TYPE=JKS")
+        
self.environment.append("KAFKA_SSL_KEYSTORE_FILENAME=kafka.keystore.jks")
+        
self.environment.append("KAFKA_SSL_KEYSTORE_CREDENTIALS=credentials.conf")
+        self.environment.append("KAFKA_SSL_KEY_CREDENTIALS=credentials.conf")
+        
self.environment.append("KAFKA_SSL_TRUSTSTORE_CREDENTIALS=credentials.conf")
+        self.environment.append("KAFKA_SSL_TRUSTSTORE_TYPE=JKS")
+        
self.environment.append("KAFKA_SSL_TRUSTSTORE_FILENAME=kafka.truststore.jks")
+        self.environment.append("KAFKA_SSL_CLIENT_AUTH=none")
+
+        kafka_cert, kafka_key = make_server_cert(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+
+        pke = jks.PrivateKeyEntry.new('kafka-broker-cert', 
[crypto.dump_certificate(crypto.FILETYPE_ASN1, kafka_cert)], 
crypto.dump_privatekey(crypto.FILETYPE_ASN1, kafka_key), 'rsa_raw')
+        server_keystore = jks.KeyStore.new('jks', [pke])
+        server_keystore_content = server_keystore.saves('abcdefgh')
+        self.files.append(File("/etc/kafka/secrets/kafka.keystore.jks", 
server_keystore_content, permissions=0o644))
+        self.files.append(File("/etc/kafka/secrets/credentials.conf", 
b'abcdefgh', permissions=0o644))
+
+        trusted_cert = jks.TrustedCertEntry.new(
+            'root-ca',  # Alias for the certificate
+            crypto.dump_certificate(crypto.FILETYPE_ASN1, 
test_context.root_ca_cert)
+        )
+
+        # Create a JKS keystore that will serve as a truststore with the 
trusted cert entry.
+        truststore = jks.KeyStore.new('jks', [trusted_cert])
+        truststore_content = truststore.saves('abcdefgh')
+        self.files.append(File("/etc/kafka/secrets/kafka.truststore.jks", 
truststore_content, permissions=0o644))

Review Comment:
   The hardcoded password 'abcdefgh' is used for keystores and truststores. 
This is a security concern even in test environments as it could be exploited 
if these containers are accidentally exposed. Consider using a randomly 
generated password or at least a more complex one, and potentially storing it 
in an environment variable.



##########
behave_framework/src/minifi_test_framework/containers/container.py:
##########
@@ -204,6 +232,7 @@ def exited(self) -> bool:
 
     def get_number_of_files(self, directory_path: str) -> int:
         if not self.container or not self.not_empty_dir_exists(directory_path):
+            logging.warning(f"Container not running or directory does not 
exist: {directory_path}")

Review Comment:
   Missing import statement. The code uses `logging.warning()` on line 235, 
`logging.debug()` on line 247, and `logging.error()` on lines 250, 317, and 
329, but `logging` is not imported at the top of this file. This will cause a 
`NameError` at runtime. Add `import logging` to the imports section.



##########
extensions/kafka/tests/features/steps/kafka_server_container.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+import jks
+
+from OpenSSL import crypto
+from minifi_test_framework.core.helpers import wait_for_condition
+from minifi_test_framework.core.ssl_utils import make_server_cert
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.containers.file import File
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+
+
+class KafkaServer(Container):
+    def __init__(self, test_context: MinifiTestContext):
+        super().__init__("apache/kafka:4.1.0", 
f"kafka-server-{test_context.scenario_id}", test_context.network)
+        self.environment.append("KAFKA_NODE_ID=1")
+        self.environment.append("KAFKA_PROCESS_ROLES=controller,broker")
+        self.environment.append("KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT")
+        self.environment.append("KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER")
+        self.environment.append("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1")
+        
self.environment.append("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1")
+        self.environment.append("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1")
+        
self.environment.append(f"KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka-server-{test_context.scenario_id}:9096")
+        
self.environment.append(f"KAFKA_LISTENERS=PLAINTEXT://kafka-server-{test_context.scenario_id}:9092,SASL_PLAINTEXT://kafka-server-{test_context.scenario_id}:9094,SSL://kafka-server-{test_context.scenario_id}:9093,SASL_SSL://kafka-server-{test_context.scenario_id}:9095,CONTROLLER://kafka-server-{test_context.scenario_id}:9096")
+        
self.environment.append(f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server-{test_context.scenario_id}:9092,SASL_PLAINTEXT://kafka-server-{test_context.scenario_id}:9094,SSL://kafka-server-{test_context.scenario_id}:9093,SASL_SSL://kafka-server-{test_context.scenario_id}:9095,CONTROLLER://kafka-server-{test_context.scenario_id}:9096")
+        
self.environment.append("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,CONTROLLER:PLAINTEXT")
+        
self.environment.append("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN")
+        self.environment.append("KAFKA_SASL_ENABLED_MECHANISMS=PLAIN")
+        
self.environment.append("KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf
 -Dlog4j2.rootLogger.level=DEBUG 
-Dlog4j2.logger.org.apache.kafka.controller.level=DEBUG")
+        self.environment.append("KAFKA_SSL_PROTOCOL=TLS")
+        self.environment.append("KAFKA_SSL_ENABLED_PROTOCOLS=TLSv1.2")
+        self.environment.append("KAFKA_SSL_KEYSTORE_TYPE=JKS")
+        
self.environment.append("KAFKA_SSL_KEYSTORE_FILENAME=kafka.keystore.jks")
+        
self.environment.append("KAFKA_SSL_KEYSTORE_CREDENTIALS=credentials.conf")
+        self.environment.append("KAFKA_SSL_KEY_CREDENTIALS=credentials.conf")
+        
self.environment.append("KAFKA_SSL_TRUSTSTORE_CREDENTIALS=credentials.conf")
+        self.environment.append("KAFKA_SSL_TRUSTSTORE_TYPE=JKS")
+        
self.environment.append("KAFKA_SSL_TRUSTSTORE_FILENAME=kafka.truststore.jks")
+        self.environment.append("KAFKA_SSL_CLIENT_AUTH=none")
+
+        kafka_cert, kafka_key = make_server_cert(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+
+        pke = jks.PrivateKeyEntry.new('kafka-broker-cert', 
[crypto.dump_certificate(crypto.FILETYPE_ASN1, kafka_cert)], 
crypto.dump_privatekey(crypto.FILETYPE_ASN1, kafka_key), 'rsa_raw')
+        server_keystore = jks.KeyStore.new('jks', [pke])
+        server_keystore_content = server_keystore.saves('abcdefgh')
+        self.files.append(File("/etc/kafka/secrets/kafka.keystore.jks", 
server_keystore_content, permissions=0o644))
+        self.files.append(File("/etc/kafka/secrets/credentials.conf", 
b'abcdefgh', permissions=0o644))
+
+        trusted_cert = jks.TrustedCertEntry.new(
+            'root-ca',  # Alias for the certificate
+            crypto.dump_certificate(crypto.FILETYPE_ASN1, 
test_context.root_ca_cert)
+        )
+
+        # Create a JKS keystore that will serve as a truststore with the 
trusted cert entry.
+        truststore = jks.KeyStore.new('jks', [trusted_cert])
+        truststore_content = truststore.saves('abcdefgh')
+        self.files.append(File("/etc/kafka/secrets/kafka.truststore.jks", 
truststore_content, permissions=0o644))
+
+        jaas_config_file_content = """
+KafkaServer {
+  org.apache.kafka.common.security.plain.PlainLoginModule required
+  username="admin"
+  password="admin-secret"
+  user_admin="admin-secret"
+  user_alice="alice-secret";
+};
+
+Client {
+  org.apache.kafka.common.security.plain.PlainLoginModule required
+  username="admin"
+  password="admin-secret";
+};
+"""
+        self.files.append(File("/opt/kafka/config/kafka_jaas.conf", 
jaas_config_file_content, permissions=0o644))
+
+    def deploy(self):
+        super().deploy()
+        finished_str = "Kafka Server started"
+        return wait_for_condition(
+            condition=lambda: finished_str in self.get_logs(),
+            timeout_seconds=15,

Review Comment:
   [nitpick] The timeout for Kafka server startup is set to 15 seconds on line 
96, which may be insufficient for slower test environments or CI/CD systems. 
Kafka can take longer to start, especially with SSL configuration. Consider 
increasing this timeout to at least 30-60 seconds to avoid flaky test failures.
   ```suggestion
               timeout_seconds=60,
   ```



##########
behave_framework/src/minifi_test_framework/containers/container.py:
##########
@@ -254,6 +285,59 @@ def verify_file_contents(self, directory_path: str, 
expected_contents: list[str]
 
         return sorted(actual_file_contents) == sorted(expected_contents)
 
+    def _verify_file_contents_in_stopped_container(self, directory_path: str, 
expected_contents: list[str]) -> bool:
+        if not self.container:
+            return False
+
+        with tempfile.TemporaryDirectory() as temp_dir:
+            extracted_dir = 
self._extract_directory_from_container(directory_path, temp_dir)
+            if not extracted_dir:
+                return False
+
+            actual_file_contents = 
self._read_files_from_directory(extracted_dir)
+            if actual_file_contents is None:
+                return False
+
+            return sorted(actual_file_contents) == sorted(expected_contents)
+
+    def _extract_directory_from_container(self, directory_path: str, temp_dir: 
str) -> str | None:
+        try:
+            bits, _ = self.container.get_archive(directory_path)
+            temp_tar_path = os.path.join(temp_dir, "archive.tar")
+
+            with open(temp_tar_path, 'wb') as f:
+                for chunk in bits:
+                    f.write(chunk)
+
+            with tarfile.open(temp_tar_path) as tar:
+                tar.extractall(path=temp_dir)
+

Review Comment:
   The method `_verify_file_contents_in_stopped_container` extracts files from 
a stopped container using tarfile. However, there's a potential security issue: 
the `tarfile.extractall()` call on line 313 is vulnerable to path traversal 
attacks if the tar archive contains malicious entries with absolute paths or 
`..` sequences. Consider using `tarfile.data_filter` (Python 3.12+) or manually 
validating paths before extraction.
   ```suggestion
                   def is_within_directory(directory, target):
                       abs_directory = os.path.abspath(directory)
                       abs_target = os.path.abspath(target)
                       return os.path.commonpath([abs_directory]) == 
os.path.commonpath([abs_directory, abs_target])
   
                   for member in tar.getmembers():
                       member_path = os.path.join(temp_dir, member.name)
                       if not is_within_directory(temp_dir, member_path):
                           logging.error(f"Attempted path traversal in tar 
file: {member.name}")
                           continue
                       tar.extract(member, path=temp_dir)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to