GitHub user Sammyjroberts edited a discussion: Can't get python function zip to 
load

```
broker:/pulsar/download/pulsar_functions/public/default/sensor-processor/0$ ls
sensor_processor  sensor_processor.zip
```

```
Archive:  sensor_processor.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
        0  05-12-2025 07:28   sensor_processor/
      279  05-12-2025 07:28   sensor_processor/requirements.txt
        0  05-12-2025 07:28   sensor_processor/src/
     4283  05-12-2025 07:28   sensor_processor/src/sensor_processor.py
        0  05-12-2025 07:28   sensor_processor/src/proto/
        0  05-12-2025 07:28   sensor_processor/src/proto/processedsensor/
        0  05-12-2025 07:28   sensor_processor/src/proto/processedsensor/v1/
     3254  05-12-2025 07:28   
sensor_processor/src/proto/processedsensor/v1/processed_sensor_data_pb2.py
      159  05-12-2025 07:28   
sensor_processor/src/proto/processedsensor/v1/processed_sensor_data_pb2_grpc.py
        0  05-12-2025 07:28   sensor_processor/src/proto/rawsensor/
        0  05-12-2025 07:28   sensor_processor/src/proto/rawsensor/v1/
     3764  05-12-2025 07:28   
sensor_processor/src/proto/rawsensor/v1/raw_sensor_data_pb2.py
     5206  05-12-2025 07:28   
sensor_processor/src/proto/rawsensor/v1/raw_sensor_data_pb2_grpc.py
        0  05-12-2025 07:28   sensor_processor/deps/
 --------                     -------
    16945                     14 files
```
    
```py
import time
import random

import json
from typing import Any
from pulsar import Function, Context

# ignore wack imports please
from .proto.processedsensor.v1.processed_sensor_data_pb2 import 
ProcessedSensorData
from .proto.rawsensor.v1.raw_sensor_data_pb2 import RawSensorData


class SensorDataProcessor(Function):
    """
    Pulsar Function that processes raw sensor data and outputs processed data.

    Uses Protocol Buffers for structured data handling.
    """

    def process(self, input: bytes, context: Context):
        try:
            logger = context.get_logger()
            msg_id = context.get_message_id()
            # Parse input protobuf message
            raw_data = RawSensorData()
            raw_data.ParseFromString(input)

            logger.info(f"Processing data from device {raw_data.device_id} with 
message ID {msg_id}")

            # Transform the data using protobuf messages
            processed_data = self.transform_sensor_data(raw_data)

            # Return serialized protobuf for output topic
            return processed_data.SerializeToString()

        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
            # For errors, we'll still use JSON for the dead letter queue
            # since it might be easier to debug
            error_data: dict[str, Any] = {
                "error": str(e),
                "timestamp": int(time.time() * 1000),
                "message_id": str(context.get_message_id()),
            }
            if raw_data:
                # If we have the device_id, include it in the error
                if hasattr(raw_data, 'device_id') and raw_data.device_id:
                    error_data["device_id"] = raw_data.device_id

            logger.error(f"Error data: {json.dumps(error_data)}")
            raise e
        finally:
            # Always log the processing time
            logger.info(f"Processing time: {context.get_processing_time()} ms")

    def transform_sensor_data(self, raw_data: RawSensorData):
        """Transform raw sensor data into processed format using protocol 
buffers"""
        # Create a new ProcessedSensorData message
        processed_data = ProcessedSensorData()

        # Copy over basic fields
        processed_data.device_id = raw_data.device_id
        processed_data.timestamp = raw_data.timestamp
        processed_data.temperature_celsius = raw_data.temperature

        # Convert temperature from Celsius to Fahrenheit
        processed_data.temperature_fahrenheit = (raw_data.temperature * 9/5) + 
32

        # Normalize humidity to 0-1.0 range (assuming input is 0-100%)
        processed_data.humidity_normalized = raw_data.humidity / 100.0

        # Convert pressure from hPa to kPa
        processed_data.pressure_kpa = raw_data.pressure / 10.0

        # Calculate battery percentage (assuming voltage range 2.8V-4.2V for a 
Li-ion battery)
        min_voltage = 2.8
        max_voltage = 4.2
        processed_data.battery_percentage = max(0, min(100, ((raw_data.voltage 
- min_voltage) / (max_voltage - min_voltage)) * 100))

        # Determine sensor status
        if processed_data.battery_percentage < 10:
            processed_data.status = ProcessedSensorData.STATUS_LOW_BATTERY
        elif raw_data.temperature > 50 or raw_data.temperature < -10:
            processed_data.status = 
ProcessedSensorData.STATUS_TEMPERATURE_WARNING
        elif raw_data.humidity > 95:
            processed_data.status = ProcessedSensorData.STATUS_HUMIDITY_HIGH
        else:
            processed_data.status = ProcessedSensorData.STATUS_NORMAL

        # Simple anomaly detection (random for demonstration purposes)
        if processed_data.status != ProcessedSensorData.STATUS_NORMAL:
            processed_data.anomaly_score = random.uniform(0, 1)
        else:
            processed_data.anomaly_score = random.uniform(0, 0.3)

        # Add processing timestamp
        processed_data.processed_at = int(time.time() * 1000)  # Current time 
in milliseconds

        # Copy over any metadata from raw data
        for key, value in raw_data.metadata.items():
            processed_data.metadata[key] = value

        return processed_data
        ```
        ```
        Traceback (most recent call last):
  File "/pulsar/instances/python-instance/util.py", line 43, in import_class
    return import_class_from_path(from_path, full_class_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/pulsar/instances/python-instance/util.py", line 72, in 
import_class_from_path
    mod = importlib.import_module(classname_path)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1324, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'sensor_processor'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/pulsar/instances/python-instance/util.py", line 48, in import_class
    return import_class_from_path(api_dir, full_class_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/pulsar/instances/python-instance/util.py", line 72, in 
import_class_from_path
    mod = importlib.import_module(classname_path)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1324, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'sensor_processor'
[2025-05-12 07:29:05 +0000] [CRITICAL] python_instance.py: Could not import 
User Function Module sensor_processor.SensorDataProcessor
[2025-05-12 07:29:05 +0000] [ERROR] log.py: Traceback (most recent call last):
[2025-05-12 07:29:05 +0000] [ERROR] log.py:   File 
"/pulsar/instances/python-instance/python_instance_main.py", line 318, in 
<module>
[2025-05-12 07:29:05 +0000] [ERROR] log.py: main()
[2025-05-12 07:29:05 +0000] [ERROR] log.py:   File 
"/pulsar/instances/python-instance/python_instance_main.py", line 299, in main
[2025-05-12 07:29:05 +0000] [ERROR] log.py: pyinstance.run()
[2025-05-12 07:29:05 +0000] [ERROR] log.py:   File 
"/pulsar/instances/python-instance/python_instance.py", line 227, in run
[2025-05-12 07:29:05 +0000] [ERROR] log.py: raise NameError("Could not import 
User Function Module %s" % self.instance_config.function_details.className)
[2025-05-12 07:29:05 +0000] [ERROR] log.py: NameError
[2025-05-12 07:29:05 +0000] [ERROR] log.py: :
[2025-05-12 07:29:05 +0000] [ERROR] log.py: Could not import User Function 
Module sensor_processor.SensorDataProcessor
cat: sensor_processor: Unknown system error
```

I tried uploading a single file, and that was fine, I have also tried not 
having a root folder, no success, including __init__.py files, no luck, 
flattening the structure, no luck.

Any recommendations on what I could be missing?

GitHub link: https://github.com/apache/pulsar/discussions/24287

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to