This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 6ba21540 chore(python): refactor Python SDK for compatibility with
Rust SDK (#1853)
6ba21540 is described below
commit 6ba2154056e24c3028cebdbecd5e8781b6c2688f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Jun 22 11:57:56 2025 +0200
chore(python): refactor Python SDK for compatibility with Rust SDK (#1853)
This commit updates the Python SDK to work with the latest version
of the Rust SDK. It includes changes to the CI configuration and
various updates to the codebase to ensure compatibility and improve
functionality.
- Updated `.github/changed-files-config.json` file patterns
- Modified `ci-check-python-sdk.yml` to have working CI
- Added new dependencies to `Cargo.lock` and `Cargo.toml`
- Refactored Python SDK code to align with the latest Rust SDK
- Enhanced testing setup with Docker and updated test scripts
- Improved documentation and contrib guidelines in `CONTRIBUTING.md`
- Updated `pyproject.toml` with new dependencies and configurations
- Added new test cases and refactored existing ones
---
.dockerignore | 2 -
.github/changed-files-config.json | 5 +
.github/workflows/ci-check-pr.yml | 2 +-
.github/workflows/ci-check-python-sdk.yml | 27 ++-
.gitignore | 1 -
Cargo.toml | 1 +
Dockerfile | 6 +-
Dockerfile => Dockerfile.debug | 16 +-
core/sdk/src/prelude.rs | 6 +-
foreign/python/.gitignore | 1 +
foreign/python/CONTRIBUTING.md | 46 ----
foreign/python/Cargo.toml | 18 +-
foreign/python/Dockerfile.test | 73 ++++++
foreign/python/README.md | 58 +++--
foreign/python/docker-compose.test.yml | 67 ++++++
foreign/python/pyproject.toml | 82 ++++++-
foreign/python/scripts/test.sh | 107 +++++++++
foreign/python/src/client.rs | 17 +-
foreign/python/src/lib.rs | 3 +-
foreign/python/src/receive_message.rs | 40 +---
foreign/python/src/send_message.rs | 24 +-
foreign/python/src/stream.rs | 2 +-
foreign/python/src/topic.rs | 2 +-
foreign/python/tests/conftest.py | 186 ++++++++++-----
foreign/python/tests/test_client.py | 57 -----
foreign/python/tests/test_iggy_sdk.py | 368 ++++++++++++++++++++++++++++++
26 files changed, 937 insertions(+), 280 deletions(-)
diff --git a/.dockerignore b/.dockerignore
index 967a690f..0caab147 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -8,5 +8,3 @@
/local_data
/scripts
/target
-Dockerfile
-docker-compose.yml
diff --git a/.github/changed-files-config.json
b/.github/changed-files-config.json
index ec91298b..5aa6ea9b 100644
--- a/.github/changed-files-config.json
+++ b/.github/changed-files-config.json
@@ -32,6 +32,11 @@
"foreign/python/.*\\.rs",
"foreign/python/pyproject.toml",
"foreign/python/Cargo.toml",
+ "foreign/python/docker-compose.test.yml",
+ "foreign/python/Dockerfile.test",
+ "foreign/python/.*\\.toml",
+ "foreign/python/.*\\.ini",
+ "foreign/python/.*\\.md",
".github/workflows/ci-check-python-sdk.yml"
],
"node-sdk": [
diff --git a/.github/workflows/ci-check-pr.yml
b/.github/workflows/ci-check-pr.yml
index 3e74b718..5f2f6ca3 100644
--- a/.github/workflows/ci-check-pr.yml
+++ b/.github/workflows/ci-check-pr.yml
@@ -38,7 +38,7 @@
#
# The workflow fails if any mandatory job fails.
# Workflow can be triggered manually or on pull request events.
-#
+
name: ci-check-pr
on:
diff --git a/.github/workflows/ci-check-python-sdk.yml
b/.github/workflows/ci-check-python-sdk.yml
index ced518ff..49b7d38d 100644
--- a/.github/workflows/ci-check-python-sdk.yml
+++ b/.github/workflows/ci-check-python-sdk.yml
@@ -45,9 +45,14 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: '3.10'
- - name: Install dependencies
- working-directory: foreign/python
- run: pip install ".[testing]"
+ - name: Cache Rust dependencies
+ uses: Swatinem/rust-cache@v2
+ with:
+ workspaces: |
+ . -> target
+ foreign/python -> target
+ cache-on-failure: true
+ key: python-${{ matrix.target }}
- name: Build wheels
uses: PyO3/maturin-action@v1
with:
@@ -57,9 +62,18 @@ jobs:
manylinux: '2_28'
args: --release --out dist --find-interpreter
sccache: 'true'
+ - name: Set up Docker Buildx
+ if: matrix.target == 'x86_64'
+ uses: docker/setup-buildx-action@v3
- name: Run tests
+ if: matrix.target == 'x86_64'
working-directory: foreign/python
- run: pytest
+ run: |
+ # Run tests using Docker Compose with caching
+ DOCKER_BUILDKIT=1 docker compose -f docker-compose.test.yml up
--build --abort-on-container-exit --exit-code-from python-tests
+
+ # Clean up
+ docker compose -f docker-compose.test.yml down -v
- name: Upload wheels
uses: actions/upload-artifact@v4
with:
@@ -119,9 +133,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- - name: Install dependencies
- working-directory: foreign/python
- run: pip install ".[testing]"
- name: Build sdist
uses: PyO3/maturin-action@v1
with:
@@ -151,4 +162,4 @@ jobs:
MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_API_TOKEN }}
with:
command: upload
- args: --non-interactive --skip-existing *
\ No newline at end of file
+ args: --non-interactive --skip-existing *
diff --git a/.gitignore b/.gitignore
index 2d912c9b..806fdb39 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,6 +34,5 @@ foreign/csharp/**/*.DotSettings.user
*.a
*.lib
*.out
-*.test
go.work
core/bench/dashboard/frontend/dist
diff --git a/Cargo.toml b/Cargo.toml
index a091758e..71715d67 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -42,6 +42,7 @@ members = [
"core/tools",
"examples/rust",
]
+exclude = ["foreign/python"]
resolver = "2"
[workspace.dependencies]
diff --git a/Dockerfile b/Dockerfile
index 7f7ca4bf..641846b2 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,7 +21,11 @@ COPY . /build
RUN cargo build --bin iggy --release
RUN cargo build --bin iggy-server --release
-FROM gcr.io/distroless/cc
+FROM debian:bookworm-slim
+RUN apt-get update && apt-get install -y \
+ ca-certificates \
+ liblzma5 \
+ && rm -rf /var/lib/apt/lists/*
COPY ./core/configs ./configs
COPY --from=builder /build/target/release/iggy .
COPY --from=builder /build/target/release/iggy-server .
diff --git a/Dockerfile b/Dockerfile.debug
similarity index 69%
copy from Dockerfile
copy to Dockerfile.debug
index 7f7ca4bf..4bcbc13a 100644
--- a/Dockerfile
+++ b/Dockerfile.debug
@@ -15,15 +15,19 @@
# specific language governing permissions and limitations
# under the License.
+# Debug/test build for faster compilation during development and testing
FROM rust:latest as builder
WORKDIR /build
COPY . /build
-RUN cargo build --bin iggy --release
-RUN cargo build --bin iggy-server --release
+# Build in debug mode for faster compilation times during testing
+RUN cargo build --bin iggy-server
-FROM gcr.io/distroless/cc
+FROM debian:bookworm-slim
+RUN apt-get update && apt-get install -y \
+ ca-certificates \
+ liblzma5 \
+ && rm -rf /var/lib/apt/lists/*
COPY ./core/configs ./configs
-COPY --from=builder /build/target/release/iggy .
-COPY --from=builder /build/target/release/iggy-server .
+COPY --from=builder /build/target/debug/iggy-server .
-CMD ["/iggy-server"]
+CMD ["/iggy-server"]
\ No newline at end of file
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index b3c85cbe..99286fac 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -58,9 +58,9 @@ pub use iggy_common::{
IggyTimestamp, MaxTopicSize, Partition, Partitioner, Partitioning,
Permissions,
PersonalAccessTokenExpiry, PollMessages, PolledMessages, PollingKind,
PollingStrategy,
QuicClientConfig, QuicClientConfigBuilder, QuicClientReconnectionConfig,
SendMessages,
- Sizeable, SnapshotCompression, Stats, Stream, StreamPermissions,
SystemSnapshotType,
- TcpClientConfig, TcpClientConfigBuilder, TcpClientReconnectionConfig,
Topic, TopicPermissions,
- UserId, UserStatus, Validatable, defaults, locking,
+ Sizeable, SnapshotCompression, Stats, Stream, StreamDetails,
StreamPermissions,
+ SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder,
TcpClientReconnectionConfig,
+ Topic, TopicDetails, TopicPermissions, UserId, UserStatus, Validatable,
defaults, locking,
};
pub use iggy_common::{
IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE,
diff --git a/foreign/python/.gitignore b/foreign/python/.gitignore
index 6e90002d..5e0072e3 100644
--- a/foreign/python/.gitignore
+++ b/foreign/python/.gitignore
@@ -40,6 +40,7 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
+test-results/
# Translations
*.mo
diff --git a/foreign/python/CONTRIBUTING.md b/foreign/python/CONTRIBUTING.md
deleted file mode 100644
index 6ff9699e..00000000
--- a/foreign/python/CONTRIBUTING.md
+++ /dev/null
@@ -1,46 +0,0 @@
-# Contributing to Apache Iggy
-
-This repository provides a Python library powered by Rust using `pyo3`. It
also utilizes Docker for server deployment.
-
-## Quick Start
-
-### 1. Run the Server with Docker
-
-Ensure you have Docker installed on your system. Then, execute the following
command:
-
-```bash
-docker run --rm -p 8080:8080 -p 3000:3000 -p 8090:8090 iggyrs/iggy:0.4.21
-```
-
-This command runs the server and maps the specified ports to your local
machine.
-
-### 2. Install loguru
-
-Loguru is advanced library used for logging information about procceses.
Install it with:
-
-```bash
-pip install loguru
-```
-
-### 3. Install Maturin
-
-Maturin is used for building Rust binaries for Python. Install it with:
-
-```bash
-pip install maturin
-```
-
-### 4. Build and Install the pyo3 Library
-
-Navigate to your library's root directory and execute:
-
-```bash
-python -m venv .venv && source .venv/bin/activate
-maturin develop
-```
-
-This will build the Rust library and make it available for Python.
-
-### 5. Running the Examples
-
-Go to [python_examples/README.md](python_examples/README.md) for instructions
on running the examples.
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 9c4b3493..1d426027 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -6,21 +6,21 @@ authors = ["Dario Lencina Talarico
<[email protected]>"]
license = "Apache-2.0"
description = "Apache Iggy is the persistent message streaming platform
written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of
processing millions of messages per second."
documentation = "https://docs.iggy.rs"
-repository = "https://github.com/iggy-rs/iggy"
+repository = "https://github.com/apache/iggy"
-[workspace]
+[dependencies]
+iggy = { path = "../../core/sdk", version = "0.7.0" }
+pyo3 = "0.25.0"
+pyo3-async-runtimes = { version = "0.25.0", features = [
+ "attributes",
+ "tokio-runtime",
+] }
+pyo3-stub-gen = "0.9.1"
-# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "iggy_py"
crate-type = ["cdylib", "rlib"]
-[dependencies]
-pyo3 = "0.25.0"
-iggy = "0.6.201"
-pyo3-async-runtimes = { version = "0.25.0", features = ["attributes",
"tokio-runtime"] }
-pyo3-stub-gen = "0.8.2"
-
[[bin]]
name = "stub_gen"
path = "src/bin/stub_gen.rs"
diff --git a/foreign/python/Dockerfile.test b/foreign/python/Dockerfile.test
new file mode 100644
index 00000000..9983f7ec
--- /dev/null
+++ b/foreign/python/Dockerfile.test
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Multi-stage build for Python SDK testing
+FROM python:3.11-slim AS base
+
+# Install system dependencies
+RUN apt-get update && apt-get install -y \
+ build-essential \
+ curl \
+ && rm -rf /var/lib/apt/lists/*
+
+# Install Rust for maturin
+RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
+ENV PATH="/root/.cargo/bin:${PATH}"
+
+# Set working directory
+WORKDIR /workspace
+
+# Copy dependency files first for better layer caching
+COPY Cargo.toml Cargo.lock ./
+COPY foreign/python/Cargo.toml ./foreign/python/
+COPY foreign/python/pyproject.toml ./foreign/python/
+COPY foreign/python/README.md ./foreign/python/
+COPY foreign/python/LICENSE ./foreign/python/
+COPY foreign/python/NOTICE ./foreign/python/
+
+# Install build dependencies
+WORKDIR /workspace/foreign/python
+RUN pip install --no-cache-dir maturin
+
+# Copy core dependencies (changes less frequently)
+COPY core/ /workspace/core/
+
+# Copy Python SDK source (changes more frequently)
+COPY foreign/python/src/ ./src/
+
+# Build Python SDK and install wheel
+RUN maturin build
+RUN find target/wheels/ -name "*.whl" -exec pip install {} \;
+
+# Install test dependencies from pyproject.toml
+RUN pip install --no-cache-dir -e ".[testing,examples]"
+
+# Copy test files
+COPY foreign/python/tests/ ./tests/
+COPY foreign/python/python_examples/ ./examples/
+
+# Create test script
+COPY foreign/python/scripts/test.sh ./scripts/test.sh
+RUN chmod +x ./scripts/test.sh
+
+# Set environment variables
+ENV PYTHONPATH=/workspace/foreign/python
+ENV IGGY_SERVER_HOST=iggy-server
+ENV IGGY_SERVER_TCP_PORT=8090
+
+# Default command runs tests
+CMD ["./scripts/test.sh"]
diff --git a/foreign/python/README.md b/foreign/python/README.md
index b368abd4..c3ff44ae 100644
--- a/foreign/python/README.md
+++ b/foreign/python/README.md
@@ -6,12 +6,26 @@ Apache Iggy is the persistent message streaming platform
written in Rust, suppor
## Installation
-To install `iggy`, use pip:
+### Basic Installation
```bash
pip install iggy-py
```
+### Development Installation
+
+For testing:
+
+```bash
+pip install -e ".[testing]"
+```
+
+For development with all tools:
+
+```bash
+pip install -e ".[dev,testing,examples]"
+```
+
### Supported Python Versions
- Python 3.7+
@@ -21,41 +35,41 @@ pip install iggy-py
All examples rely on a running iggy server. To start the server, execute:
```bash
-docker run --rm -p 8080:8080 -p 3000:3000 -p 8090:8090 iggyrs/iggy:0.4.21
+# Using latest version
+docker run --rm -p 8080:8080 -p 3000:3000 -p 8090:8090 apache/iggy:latest
+
+# Or build from source (recommended for development)
+cd ../../ && cargo run --bin iggy-server
```
-## Generating Stub Files
+## Testing
-To generate a stub file, execute the following command:
+### Quick Test
```bash
-cargo run --bin stub_gen
+# Run tests with Docker (recommended)
+docker compose -f docker-compose.test.yml up --build
```
-Refer to the python_examples directory for examples on how to use the iggy
library.
+### Local Development
-## Running the Examples
+```bash
+# Install dependencies and build
+pip install -e ".[testing]"
+maturin develop
-Go to [python_examples/README.md](python_examples/README.md) for instructions
on running the examples.
+# Run tests (requires iggy-server running)
+pytest tests/ -v
+```
-## API Reference
+## Examples
-For detailed documentation, visit [Apache Iggy's official
Docs](https://docs.iggy.rs/).
+Refer to the [python_examples/](python_examples/) directory for usage examples.
## Contributing
-Contributions are welcome! Please:
-
-1. Fork the repository on GitHub.
-2. Create an issue for any bugs or features you'd like to address.
-3. Submit pull requests following our code style guidelines.
-
-For more details, see the [Developer README](CONTRIBUTING.md).
+See [CONTRIBUTING.md](CONTRIBUTING.md) for development setup and guidelines.
## License
-Apache iggy is distributed under the Apache 2.0 License. See
[LICENSE](LICENSE) for terms and conditions.
-
-## Contact Information
-
-For questions, suggestions, or issues, contact the developers at [your email
address] or raise an issue on GitHub.
+Licensed under the Apache License 2.0. See [LICENSE](LICENSE) for details.
diff --git a/foreign/python/docker-compose.test.yml
b/foreign/python/docker-compose.test.yml
new file mode 100644
index 00000000..490aa5c7
--- /dev/null
+++ b/foreign/python/docker-compose.test.yml
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+ iggy-server:
+ build:
+ context: ../..
+ # Use debug build for faster compilation during testing
+ dockerfile: Dockerfile.debug
+ container_name: iggy-server-python-test
+ networks:
+ - python-test-network
+ ports:
+ - "3000:3000"
+ - "8080:8080"
+ - "8090:8090"
+ healthcheck:
+ test: ["CMD-SHELL", "timeout 5 bash -c '</dev/tcp/localhost/8090' ||
exit 1"]
+ interval: 5s
+ timeout: 5s
+ retries: 12
+ start_period: 10s
+ volumes:
+ - iggy-data:/local_data
+
+ python-tests:
+ build:
+ context: ../..
+ dockerfile: foreign/python/Dockerfile.test
+ container_name: python-sdk-tests
+ depends_on:
+ iggy-server:
+ condition: service_healthy
+ networks:
+ - python-test-network
+ environment:
+ - IGGY_SERVER_HOST=iggy-server
+ - IGGY_SERVER_TCP_PORT=8090
+ - IGGY_SERVER_HTTP_PORT=3000
+ - IGGY_SERVER_QUIC_PORT=8080
+ - PYTHONPATH=/workspace/foreign/python
+ - PYTEST_ARGS=-v --tb=short
+ volumes:
+ - ./test-results:/workspace/foreign/python/test-results
+
+networks:
+ python-test-network:
+ name: python-test-network
+
+volumes:
+ iggy-data:
diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml
index bba4b030..ac1bb550 100644
--- a/foreign/python/pyproject.toml
+++ b/foreign/python/pyproject.toml
@@ -6,23 +6,95 @@ build-backend = "maturin"
name = "iggy_py"
requires-python = ">=3.7"
version = "0.4.0"
+description = "Apache Iggy is the persistent message streaming platform
written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of
processing millions of messages per second."
+readme = "README.md"
+license = { file = "LICENSE" }
+authors = [
+ { name = "Dario Lencina Talarico", email =
"[email protected]" },
+]
+keywords = ["streaming", "messaging", "pubsub", "iggy", "rust", "performance"]
classifiers = [
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ "Topic :: System :: Networking",
+ "Topic :: System :: Distributed Computing",
]
-description= "Apache Iggy is the persistent message streaming platform written
in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of
processing millions of messages per second."
+
+[project.urls]
+"Homepage" = "https://iggy.rs"
+"Documentation" = "https://docs.iggy.rs"
+"Repository" = "https://github.com/iggy-rs/iggy"
+"Bug Tracker" = "https://github.com/iggy-rs/iggy/issues"
+"Discord" = "https://iggy.rs/discord"
[tool.maturin]
features = ["pyo3/extension-module"]
[project.optional-dependencies]
+# Core testing dependencies
testing = [
- "pytest",
- "pytest-asyncio",
- "testcontainers[docker]",
- "maturin"
+ "pytest>=7.0,<9.0",
+ "pytest-asyncio>=0.21.0,<1.0",
+ "pytest-xdist>=3.0,<4.0",
+ "pytest-timeout>=2.0,<3.0",
+]
+
+# For examples and demos
+examples = ["loguru>=0.7.0,<1.0"]
+
+# For containerized testing (CI/CD)
+testing-docker = ["testcontainers[docker]>=3.7.0,<5.0"]
+
+# Development tools
+dev = [
+ "maturin>=1.2,<2.0",
+ "black>=23.0,<25.0",
+ "isort>=5.12.0,<6.0",
+ "mypy>=1.5.0,<2.0",
+ "ruff>=0.1.0,<1.0",
+]
+
+# All dependencies for full development setup
+all = [
+ "pytest>=7.0,<9.0",
+ "pytest-asyncio>=0.21.0,<1.0",
+ "pytest-xdist>=3.0,<4.0",
+ "pytest-timeout>=2.0,<3.0",
+ "loguru>=0.7.0,<1.0",
+ "testcontainers[docker]>=3.7.0,<5.0",
+ "maturin>=1.2,<2.0",
+ "black>=23.0,<25.0",
+ "isort>=5.12.0,<6.0",
+ "mypy>=1.5.0,<2.0",
+ "ruff>=0.1.0,<1.0",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
+asyncio_default_fixture_loop_scope = "session"
+testpaths = ["tests"]
+python_files = ["test_*.py"]
+python_classes = ["Test*"]
+python_functions = ["test_*"]
+addopts = ["-v", "--strict-markers", "--tb=short", "--disable-warnings"]
+markers = [
+ "integration: Integration tests that require a running server",
+ "unit: Fast unit tests",
+]
+filterwarnings = [
+ "ignore::DeprecationWarning",
+ "ignore::pytest.PytestUnraisableExceptionWarning",
+]
diff --git a/foreign/python/scripts/test.sh b/foreign/python/scripts/test.sh
new file mode 100755
index 00000000..d931bfac
--- /dev/null
+++ b/foreign/python/scripts/test.sh
@@ -0,0 +1,107 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+echo "๐ Python SDK Test Runner"
+echo "========================="
+
+# Wait for server to be ready
+echo "โณ Waiting for Iggy server to be ready..."
+timeout 60 bash -c "
+ until timeout 5 bash -c
'</dev/tcp/\${IGGY_SERVER_HOST}/\${IGGY_SERVER_TCP_PORT}'; do
+ echo ' Server not ready, waiting...'
+ sleep 2
+ done
+"
+echo "โ
Server is ready!"
+
+# Test connection
+echo "๐ Testing basic connectivity..."
+
+# Resolve hostname to IP address for Rust client compatibility
+SERVER_IP=$(getent hosts ${IGGY_SERVER_HOST} | awk '{ print $1 }' | head -n1)
+if [ -z "$SERVER_IP" ]; then
+ echo "โ Could not resolve hostname: ${IGGY_SERVER_HOST}"
+ exit 1
+fi
+echo "๐ Resolved ${IGGY_SERVER_HOST} to ${SERVER_IP}"
+
+python3 -c "
+import asyncio
+import sys
+from iggy_py import IggyClient
+
+async def test_connection():
+ try:
+ client = IggyClient('${SERVER_IP}:${IGGY_SERVER_TCP_PORT}')
+ await client.connect()
+ await client.login_user('iggy', 'iggy')
+ await client.ping()
+ print('โ
Connection test passed')
+ return True
+ except Exception as e:
+ print(f'โ Connection test failed: {e}')
+ return False
+
+result = asyncio.run(test_connection())
+sys.exit(0 if result else 1)
+"
+
+if [ $? -ne 0 ]; then
+ echo "โ Connection test failed, aborting tests"
+ exit 1
+fi
+
+# Create test results directory
+mkdir -p test-results
+
+# Run tests with detailed output
+echo "๐งช Running Python SDK tests..."
+python3 -m pytest \
+ ${PYTEST_ARGS:-"-v --tb=short"} \
+ --junit-xml=test-results/pytest.xml \
+ tests/
+
+TEST_EXIT_CODE=$?
+
+if [ $TEST_EXIT_CODE -eq 0 ]; then
+ echo "โ
All tests passed!"
+else
+ echo "โ Some tests failed (exit code: $TEST_EXIT_CODE)"
+fi
+
+# Run examples if tests pass
+if [ $TEST_EXIT_CODE -eq 0 ] && [ "${RUN_EXAMPLES:-false}" = "true" ]; then
+ echo "๐ Running example scripts..."
+
+ # Note: Examples might run indefinitely, so we'll just test they start
correctly
+ timeout 10 python3 examples/producer.py &
+ PRODUCER_PID=$!
+ sleep 5
+
+ if kill -0 $PRODUCER_PID 2>/dev/null; then
+ echo "โ
Producer example started successfully"
+ kill $PRODUCER_PID
+ else
+ echo "โ Producer example failed to start"
+ fi
+fi
+
+echo "๐ Test results saved to test-results/"
+exit $TEST_EXIT_CODE
\ No newline at end of file
diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs
index 993a3adb..57baafcf 100644
--- a/foreign/python/src/client.rs
+++ b/foreign/python/src/client.rs
@@ -19,22 +19,15 @@
use std::str::FromStr;
use std::sync::Arc;
-use iggy::client::{Client, MessageClient, StreamClient, UserClient};
-use iggy::client::{SystemClient, TopicClient};
-use iggy::clients::builder::IggyClientBuilder;
-use iggy::clients::client::IggyClient as RustIggyClient;
-use iggy::compression::compression_algorithm::CompressionAlgorithm;
-use iggy::consumer::Consumer as RustConsumer;
-use iggy::identifier::Identifier;
-use iggy::messages::poll_messages::PollingStrategy as RustPollingStrategy;
-use iggy::messages::send_messages::{Message as RustMessage, Partitioning};
-use iggy::utils::expiry::IggyExpiry;
-use iggy::utils::topic_size::MaxTopicSize;
+use iggy::prelude::{
+ Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as
RustMessage,
+ PollingStrategy as RustPollingStrategy, *,
+};
use pyo3::prelude::*;
use pyo3::types::PyList;
use pyo3_async_runtimes::tokio::future_into_py;
-use pyo3_stub_gen::{define_stub_info_gatherer, impl_stub_type};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
+use pyo3_stub_gen::{define_stub_info_gatherer, impl_stub_type};
use crate::receive_message::{PollingStrategy, ReceiveMessage};
use crate::send_message::SendMessage;
diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs
index 2bc54dd5..092cf954 100644
--- a/foreign/python/src/lib.rs
+++ b/foreign/python/src/lib.rs
@@ -24,7 +24,7 @@ mod topic;
use client::IggyClient;
use pyo3::prelude::*;
-use receive_message::{MessageState, PollingStrategy, ReceiveMessage};
+use receive_message::{PollingStrategy, ReceiveMessage};
use send_message::SendMessage;
use stream::StreamDetails;
use topic::TopicDetails;
@@ -38,6 +38,5 @@ fn iggy_py(_py: Python, m: &Bound<'_, PyModule>) ->
PyResult<()> {
m.add_class::<StreamDetails>()?;
m.add_class::<TopicDetails>()?;
m.add_class::<PollingStrategy>()?;
- m.add_class::<MessageState>()?;
Ok(())
}
diff --git a/foreign/python/src/receive_message.rs
b/foreign/python/src/receive_message.rs
index e52d4ec6..f93d157e 100644
--- a/foreign/python/src/receive_message.rs
+++ b/foreign/python/src/receive_message.rs
@@ -16,9 +16,7 @@
* under the License.
*/
-use iggy::messages::poll_messages::PollingStrategy as RustPollingStrategy;
-use iggy::models::messages::MessageState as RustMessageState;
-use iggy::models::messages::PolledMessage as RustReceiveMessage;
+use iggy::prelude::{IggyMessage as RustReceiveMessage, PollingStrategy as
RustPollingStrategy};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_enum,
gen_stub_pymethods};
@@ -41,16 +39,6 @@ impl ReceiveMessage {
}
}
-#[gen_stub_pyclass_enum]
-#[pyclass(eq, eq_int)]
-#[derive(PartialEq)]
-pub enum MessageState {
- Available,
- Unavailable,
- Poisoned,
- MarkedForDeletion,
-}
-
#[gen_stub_pymethods]
#[pymethods]
impl ReceiveMessage {
@@ -65,47 +53,35 @@ impl ReceiveMessage {
///
/// The offset represents the position of the message within its topic.
pub fn offset(&self) -> u64 {
- self.inner.offset
+ self.inner.header.offset
}
/// Retrieves the timestamp of the received message.
///
/// The timestamp represents the time of the message within its topic.
pub fn timestamp(&self) -> u64 {
- self.inner.timestamp
+ self.inner.header.timestamp
}
/// Retrieves the id of the received message.
///
/// The id represents unique identifier of the message within its topic.
pub fn id(&self) -> u128 {
- self.inner.id
+ self.inner.header.id
}
/// Retrieves the checksum of the received message.
///
/// The checksum represents the integrity of the message within its topic.
- pub fn checksum(&self) -> u32 {
- self.inner.checksum
- }
-
- /// Retrieves the Message's state of the received message.
- ///
- /// State represents the state of the response.
- pub fn state(&self) -> MessageState {
- match self.inner.state {
- RustMessageState::Available => MessageState::Available,
- RustMessageState::Unavailable => MessageState::Unavailable,
- RustMessageState::Poisoned => MessageState::Poisoned,
- RustMessageState::MarkedForDeletion =>
MessageState::MarkedForDeletion,
- }
+ pub fn checksum(&self) -> u64 {
+ self.inner.header.checksum
}
/// Retrieves the length of the received message.
///
/// The length represents the length of the payload.
- pub fn length(&self) -> u64 {
- self.inner.length.as_bytes_u64()
+ pub fn length(&self) -> u32 {
+ self.inner.header.payload_length
}
}
diff --git a/foreign/python/src/send_message.rs
b/foreign/python/src/send_message.rs
index 9b3aff76..18a0e5c1 100644
--- a/foreign/python/src/send_message.rs
+++ b/foreign/python/src/send_message.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use iggy::messages::send_messages::Message as RustSendMessage;
+use iggy::prelude::{IggyMessage as RustIggyMessage, IggyMessageHeader};
use pyo3::prelude::*;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use std::str::FromStr;
@@ -28,17 +28,25 @@ use std::str::FromStr;
#[pyclass]
#[gen_stub_pyclass]
pub struct SendMessage {
- pub(crate) inner: RustSendMessage,
+ pub(crate) inner: RustIggyMessage,
}
-/// Provides the capability to clone a SendMessage.
-///
-/// This implementation creates a new `RustSendMessage` instance from
-/// the string representation of the original `RustSendMessage`.
impl Clone for SendMessage {
fn clone(&self) -> Self {
Self {
- inner: self.inner.clone(),
+ inner: RustIggyMessage {
+ header: IggyMessageHeader {
+ checksum: self.inner.header.checksum,
+ id: self.inner.header.id,
+ offset: self.inner.header.offset,
+ timestamp: self.inner.header.timestamp,
+ origin_timestamp: self.inner.header.origin_timestamp,
+ user_headers_length: self.inner.header.user_headers_length,
+ payload_length: self.inner.header.payload_length,
+ },
+ payload: self.inner.payload.clone(),
+ user_headers: self.inner.user_headers.clone(),
+ },
}
}
}
@@ -53,7 +61,7 @@ impl SendMessage {
#[new]
pub fn new(data: String) -> Self {
// TODO: handle errors
- let inner = RustSendMessage::from_str(&data).unwrap();
+ let inner = RustIggyMessage::from_str(&data).unwrap();
Self { inner }
}
}
diff --git a/foreign/python/src/stream.rs b/foreign/python/src/stream.rs
index 82577c5c..d02ec823 100644
--- a/foreign/python/src/stream.rs
+++ b/foreign/python/src/stream.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use iggy::models::stream::StreamDetails as RustStreamDetails;
+use iggy::prelude::StreamDetails as RustStreamDetails;
use pyo3::prelude::*;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
diff --git a/foreign/python/src/topic.rs b/foreign/python/src/topic.rs
index 6b50aa35..0eef9d25 100644
--- a/foreign/python/src/topic.rs
+++ b/foreign/python/src/topic.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use iggy::models::topic::TopicDetails as RustTopicDetails;
+use iggy::prelude::TopicDetails as RustTopicDetails;
use pyo3::prelude::*;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
diff --git a/foreign/python/tests/conftest.py b/foreign/python/tests/conftest.py
index b8e273b2..f28a09fa 100644
--- a/foreign/python/tests/conftest.py
+++ b/foreign/python/tests/conftest.py
@@ -15,92 +15,91 @@
# specific language governing permissions and limitations
# under the License.
+"""
+Test configuration and fixtures for the Python SDK test suite.
+
+This module provides pytest fixtures for setting up test environments
+and connecting to Iggy servers in various configurations.
+"""
+
+import asyncio
+import os
import socket
-import pytest
import time
-from typing import Generator
-from testcontainers.core.container import DockerContainer
+from typing import Generator, Optional
+
+import pytest
from iggy_py import IggyClient
[email protected](scope="session")
-def iggy_container() -> Generator[DockerContainer, None, None]:
- """
- Creates and starts an Iggy server container using Docker.
+def get_server_config() -> tuple[str, int]:
"""
- container = DockerContainer("iggyrs/iggy:0.4.21").with_exposed_ports(
- 8080, 3000, 8090
- )
- container.start()
-
- yield container
-
- container.stop()
-
-
[email protected](scope="session")
-async def iggy_client(iggy_container: DockerContainer) -> IggyClient:
+ Get server configuration from environment variables or defaults.
+
+ Returns:
+ tuple: (host, port) for the Iggy server
"""
- Initializes and returns an Iggy client connected to the running Iggy
server container.
-
- This fixture ensures that the client is authenticated and ready for use in
tests.
-
- :param iggy_container: The running Iggy container fixture.
- :return: An instance of IggyClient connected to the server.
- """
- host = iggy_container.get_container_host_ip()
- port = iggy_container.get_exposed_port(8090)
- wait_for_container(port, host, timeout=30, interval=5)
-
- client = IggyClient(f"{host}:{port}")
-
- await client.connect()
-
- await wait_for_ping(client, timeout=30, interval=5)
-
- await client.login_user("iggy", "iggy")
- return client
-
-
-def wait_for_container(port: int, host: str, timeout: int, interval: int) ->
None:
+ host = os.environ.get("IGGY_SERVER_HOST", "127.0.0.1")
+ port = int(os.environ.get("IGGY_SERVER_TCP_PORT", "8090"))
+
+ # Convert hostname to IP address for the Rust client
+ if host not in ("127.0.0.1", "localhost"):
+ try:
+ # Resolve hostname to IP address
+ host_ip = socket.gethostbyname(host)
+ host = host_ip
+ except socket.gaierror:
+ # If resolution fails, keep the original host
+ pass
+ elif host == "localhost":
+ host = "127.0.0.1"
+
+ return host, port
+
+
+def wait_for_server(host: str, port: int, timeout: int = 60, interval: int =
2) -> None:
"""
- Waits for a container to become alive by polling a specified port.
-
- :param port: The port number to poll.
- :param host: The hostname or IP address of the container (default is
'localhost').
- :param timeout: The maximum time in seconds to wait for the container to
become available (default is 30).
- :param interval: The time in seconds between each polling attempt (default
is 2).
+ Wait for the server to become available.
+
+ Args:
+ host: Server hostname or IP
+ port: Server port
+ timeout: Maximum time to wait in seconds
+ interval: Time between connection attempts in seconds
+
+ Raises:
+ TimeoutError: If server doesn't become available within timeout
"""
start_time = time.time()
-
+
while True:
try:
with socket.create_connection((host, port), timeout=interval):
return
- except (socket.timeout, ConnectionRefusedError):
+ except (socket.timeout, ConnectionRefusedError, OSError):
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
raise TimeoutError(
- f"Timed out after {timeout} seconds waiting for container
to become available at {host}:{port}"
+ f"Server not available at {host}:{port} after {timeout}s"
)
-
time.sleep(interval)
-async def wait_for_ping(
- client: IggyClient, timeout: int = 30, interval: int = 5
-) -> None:
+async def wait_for_ping(client: IggyClient, timeout: int = 30, interval: int =
2) -> None:
"""
- Waits for the Iggy server to respond to ping requests before proceeding.
-
- :param client: The Iggy client instance.
- :param timeout: The maximum time in seconds to wait for the server to
respond.
- :param interval: The time in seconds between each ping attempt.
- :raises TimeoutError: If the server does not respond within the timeout
period.
+ Wait for the server to respond to ping requests.
+
+ Args:
+ client: Iggy client instance
+ timeout: Maximum time to wait in seconds
+ interval: Time between ping attempts in seconds
+
+ Raises:
+ TimeoutError: If server doesn't respond to ping within timeout
"""
start_time = time.time()
-
+
while True:
try:
await client.ping()
@@ -109,7 +108,68 @@ async def wait_for_ping(
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
raise TimeoutError(
- f"Timed out after {timeout} seconds waiting for Iggy
server to respond to ping."
+ f"Server not responding to ping after {timeout}s"
)
+ await asyncio.sleep(interval)
- time.sleep(interval)
+
[email protected](scope="session")
+async def iggy_client() -> IggyClient:
+ """
+ Create and configure an Iggy client for testing.
+
+ This fixture:
+ 1. Gets server configuration from environment
+ 2. Waits for server to be available
+ 3. Creates and connects the client
+ 4. Authenticates with default credentials
+ 5. Verifies connectivity with ping
+
+ Returns:
+ IggyClient: Authenticated client ready for testing
+ """
+ host, port = get_server_config()
+
+ # Wait for server to be ready
+ wait_for_server(host, port)
+
+ # Create and connect client
+ client = IggyClient(f"{host}:{port}")
+ await client.connect()
+
+ # Wait for server to be fully ready
+ await wait_for_ping(client)
+
+ # Authenticate
+ await client.login_user("iggy", "iggy")
+
+ return client
+
+
[email protected](scope="session", autouse=True)
+def configure_asyncio():
+ """Configure asyncio settings for tests."""
+ # Set event loop policy if needed
+ if os.name == 'nt': # Windows
+ asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
+
+
+# Pytest configuration
+def pytest_configure(config):
+ """Configure pytest with custom markers."""
+ config.addinivalue_line(
+ "markers",
+ "integration: marks tests as integration tests (may be slow)"
+ )
+ config.addinivalue_line(
+ "markers",
+ "unit: marks tests as unit tests (fast)"
+ )
+
+
+def pytest_collection_modifyitems(config, items):
+ """Modify test collection to add markers automatically."""
+ for item in items:
+ # Mark all tests in test_iggy_sdk.py as integration tests
+ if "test_iggy_sdk" in item.nodeid:
+ item.add_marker(pytest.mark.integration)
diff --git a/foreign/python/tests/test_client.py
b/foreign/python/tests/test_client.py
deleted file mode 100644
index 3bec0680..00000000
--- a/foreign/python/tests/test_client.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from iggy_py import PollingStrategy
-from iggy_py import SendMessage as Message
-from iggy_py import IggyClient
-
-STREAM_NAME = "test-stream"
-TOPIC_NAME = "test-topic"
-PARTITION_ID = 1
-
-
-async def test_send_and_poll_messages(iggy_client: IggyClient):
- assert iggy_client is not None
-
- await iggy_client.create_stream(STREAM_NAME)
- stream = await iggy_client.get_stream(STREAM_NAME)
- assert stream is not None
- assert stream.name == STREAM_NAME
-
- await iggy_client.create_topic(STREAM_NAME, TOPIC_NAME, partitions_count=1)
- topic = await iggy_client.get_topic(STREAM_NAME, TOPIC_NAME)
- assert topic is not None
- assert topic.name == TOPIC_NAME
-
- messages = [
- Message("Message 1"),
- Message("Message 2"),
- ]
- await iggy_client.send_messages(STREAM_NAME, TOPIC_NAME, PARTITION_ID,
messages)
-
- polled_messages = await iggy_client.poll_messages(
- STREAM_NAME,
- TOPIC_NAME,
- PARTITION_ID,
- PollingStrategy.Next(),
- count=10,
- auto_commit=True,
- )
-
- assert len(polled_messages) >= 2
- assert polled_messages[0].payload().decode("utf-8") == "Message 1"
- assert polled_messages[1].payload().decode("utf-8") == "Message 2"
diff --git a/foreign/python/tests/test_iggy_sdk.py
b/foreign/python/tests/test_iggy_sdk.py
new file mode 100644
index 00000000..7756eaea
--- /dev/null
+++ b/foreign/python/tests/test_iggy_sdk.py
@@ -0,0 +1,368 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Comprehensive test suite for the Python SDK.
+
+This module contains all the tests for the Iggy Python SDK, organized by
functionality.
+Tests are marked as either 'unit' or 'integration' based on their requirements.
+"""
+
+import uuid
+from typing import List
+
+import pytest
+
+from iggy_py import IggyClient, PollingStrategy
+from iggy_py import SendMessage as Message
+
+
+class TestConnectivity:
+ """Test basic connectivity and authentication."""
+
+ @pytest.mark.asyncio
+ async def test_ping(self, iggy_client: IggyClient):
+ """Test server ping functionality."""
+ await iggy_client.ping()
+
+ @pytest.mark.asyncio
+ async def test_client_not_none(self, iggy_client: IggyClient):
+ """Test that client fixture is properly initialized."""
+ assert iggy_client is not None
+
+
+class TestStreamOperations:
+ """Test stream creation, retrieval, and management."""
+
+ @pytest.fixture
+ def unique_stream_name(self):
+ """Generate unique stream name for each test."""
+ return f"test-stream-{uuid.uuid4().hex[:8]}"
+
+ @pytest.mark.asyncio
+ async def test_create_and_get_stream(self, iggy_client: IggyClient,
unique_stream_name):
+ """Test stream creation and retrieval."""
+ # Create stream
+ await iggy_client.create_stream(unique_stream_name)
+
+ # Get stream by name
+ stream = await iggy_client.get_stream(unique_stream_name)
+ assert stream is not None
+ assert stream.name == unique_stream_name
+ assert stream.id > 0
+
+ @pytest.mark.asyncio
+ async def test_list_streams(self, iggy_client: IggyClient,
unique_stream_name):
+ """Test listing streams."""
+ # Create a stream first
+ await iggy_client.create_stream(unique_stream_name)
+
+ # Get the stream we just created
+ stream = await iggy_client.get_stream(unique_stream_name)
+ assert stream is not None
+ assert stream.name == unique_stream_name
+ assert stream.id > 0
+ assert stream.topics_count == 0 # New stream has no topics
+
+
+class TestTopicOperations:
+ """Test topic creation, retrieval, and management."""
+
+ @pytest.fixture
+ def unique_names(self):
+ """Generate unique stream and topic names."""
+ unique_id = uuid.uuid4().hex[:8]
+ return {
+ 'stream': f"test-stream-{unique_id}",
+ 'topic': f"test-topic-{unique_id}"
+ }
+
+ @pytest.mark.asyncio
+ async def test_create_and_get_topic(self, iggy_client: IggyClient,
unique_names):
+ """Test topic creation and retrieval."""
+ stream_name = unique_names['stream']
+ topic_name = unique_names['topic']
+
+ # Create stream first
+ await iggy_client.create_stream(stream_name)
+
+ # Create topic
+ await iggy_client.create_topic(
+ stream=stream_name,
+ name=topic_name,
+ partitions_count=2
+ )
+
+ # Get topic by name
+ topic = await iggy_client.get_topic(stream_name, topic_name)
+ assert topic is not None
+ assert topic.name == topic_name
+ assert topic.id > 0
+ assert topic.partitions_count == 2
+
+ @pytest.mark.asyncio
+ async def test_list_topics(self, iggy_client: IggyClient, unique_names):
+ """Test listing topics in a stream."""
+ stream_name = unique_names['stream']
+ topic_name = unique_names['topic']
+
+ # Create stream and topic
+ await iggy_client.create_stream(stream_name)
+ await iggy_client.create_topic(
+ stream=stream_name,
+ name=topic_name,
+ partitions_count=1
+ )
+
+ # Get the topic we just created
+ topic = await iggy_client.get_topic(stream_name, topic_name)
+ assert topic is not None
+ assert topic.name == topic_name
+ assert topic.id > 0
+ assert topic.partitions_count == 1
+
+
+class TestMessageOperations:
+ """Test message sending, polling, and processing."""
+
+ @pytest.fixture
+ def message_setup(self):
+ """Setup unique names and test data for messaging tests."""
+ unique_id = uuid.uuid4().hex[:8]
+ return {
+ 'stream': f"msg-stream-{unique_id}",
+ 'topic': f"msg-topic-{unique_id}",
+ 'partition_id': 1,
+ 'messages': [
+ f"Test message {i} - {unique_id}" for i in range(1, 4)
+ ]
+ }
+
+ @pytest.mark.asyncio
+ async def test_send_and_poll_messages(self, iggy_client: IggyClient,
message_setup):
+ """Test basic message sending and polling."""
+ stream_name = message_setup['stream']
+ topic_name = message_setup['topic']
+ partition_id = message_setup['partition_id']
+ test_messages = message_setup['messages']
+
+ # Setup stream and topic
+ await iggy_client.create_stream(stream_name)
+ await iggy_client.create_topic(
+ stream=stream_name,
+ name=topic_name,
+ partitions_count=1
+ )
+
+ # Send messages
+ messages = [Message(msg) for msg in test_messages]
+ await iggy_client.send_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partitioning=partition_id,
+ messages=messages
+ )
+
+ # Poll messages
+ polled_messages = await iggy_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+ polling_strategy=PollingStrategy.First(),
+ count=10,
+ auto_commit=True
+ )
+
+ # Verify we got our messages
+ assert len(polled_messages) >= len(test_messages)
+
+ # Check the first few messages match what we sent
+ for i, expected_msg in enumerate(test_messages):
+ if i < len(polled_messages):
+ actual_payload = polled_messages[i].payload().decode("utf-8")
+ assert actual_payload == expected_msg
+
+ @pytest.mark.asyncio
+ async def test_message_properties(self, iggy_client: IggyClient,
message_setup):
+ """Test access to message properties."""
+ stream_name = message_setup['stream']
+ topic_name = message_setup['topic']
+ partition_id = message_setup['partition_id']
+
+ # Setup
+ await iggy_client.create_stream(stream_name)
+ await iggy_client.create_topic(
+ stream=stream_name,
+ name=topic_name,
+ partitions_count=1
+ )
+
+ # Send a test message
+ test_payload = f"Property test - {uuid.uuid4().hex[:8]}"
+ message = Message(test_payload)
+ await iggy_client.send_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partitioning=partition_id,
+ messages=[message]
+ )
+
+ # Poll and verify properties
+ polled_messages = await iggy_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+ polling_strategy=PollingStrategy.Last(),
+ count=1,
+ auto_commit=True
+ )
+
+ assert len(polled_messages) >= 1
+ msg = polled_messages[0]
+
+ # Test all message properties are accessible and have reasonable values
+ assert msg.payload().decode("utf-8") == test_payload
+ assert isinstance(msg.offset(), int) and msg.offset() >= 0
+ assert isinstance(msg.id(), int) and msg.id() > 0
+ assert isinstance(msg.timestamp(), int) and msg.timestamp() > 0
+ assert isinstance(msg.checksum(), int)
+ assert isinstance(msg.length(), int) and msg.length() > 0
+
+
+class TestPollingStrategies:
+ """Test different polling strategies."""
+
+ @pytest.fixture
+ def polling_setup(self):
+ """Setup for polling strategy tests."""
+ unique_id = uuid.uuid4().hex[:8]
+ return {
+ 'stream': f"poll-stream-{unique_id}",
+ 'topic': f"poll-topic-{unique_id}",
+ 'partition_id': 1,
+ 'messages': [f"Polling test {i} - {unique_id}" for i in range(5)]
+ }
+
+ @pytest.mark.asyncio
+ async def test_polling_strategies(self, iggy_client: IggyClient,
polling_setup):
+ """Test different polling strategies work correctly."""
+ stream_name = polling_setup['stream']
+ topic_name = polling_setup['topic']
+ partition_id = polling_setup['partition_id']
+ test_messages = polling_setup['messages']
+
+ # Setup
+ await iggy_client.create_stream(stream_name)
+ await iggy_client.create_topic(
+ stream=stream_name,
+ name=topic_name,
+ partitions_count=1
+ )
+
+ # Send test messages
+ messages = [Message(msg) for msg in test_messages]
+ await iggy_client.send_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partitioning=partition_id,
+ messages=messages
+ )
+
+ # Test First strategy
+ first_messages = await iggy_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+ polling_strategy=PollingStrategy.First(),
+ count=1,
+ auto_commit=False
+ )
+ assert len(first_messages) >= 1
+
+ # Test Last strategy
+ last_messages = await iggy_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+ polling_strategy=PollingStrategy.Last(),
+ count=1,
+ auto_commit=False
+ )
+ assert len(last_messages) >= 1
+
+ # Test Next strategy
+ next_messages = await iggy_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+ polling_strategy=PollingStrategy.Next(),
+ count=2,
+ auto_commit=False
+ )
+ assert len(next_messages) >= 1
+
+ # Test Offset strategy (if we have messages)
+ if first_messages:
+ offset_messages = await iggy_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+
polling_strategy=PollingStrategy.Offset(value=first_messages[0].offset()),
+ count=1,
+ auto_commit=False
+ )
+ assert len(offset_messages) >= 1
+
+
+class TestErrorHandling:
+ """Test error handling and edge cases."""
+
+ @pytest.mark.asyncio
+ async def test_duplicate_stream_creation(self, iggy_client: IggyClient):
+ """Test that creating duplicate streams raises appropriate errors."""
+ stream_name = f"duplicate-test-{uuid.uuid4().hex[:8]}"
+
+ # Create stream first time - should succeed
+ await iggy_client.create_stream(stream_name)
+
+ # Create same stream again - should fail
+ with pytest.raises(RuntimeError) as exc_info:
+ await iggy_client.create_stream(stream_name)
+
+ assert "StreamNameAlreadyExists" in str(exc_info.value)
+
+ @pytest.mark.asyncio
+ async def test_get_nonexistent_stream(self, iggy_client: IggyClient):
+ """Test getting a non-existent stream."""
+ nonexistent_name = f"nonexistent-{uuid.uuid4().hex}"
+
+ # get_stream returns None for non-existent streams
+ stream = await iggy_client.get_stream(nonexistent_name)
+ assert stream is None
+
+ @pytest.mark.asyncio
+ async def test_create_topic_in_nonexistent_stream(self, iggy_client:
IggyClient):
+ """Test creating a topic in a non-existent stream."""
+ nonexistent_stream = f"nonexistent-{uuid.uuid4().hex}"
+ topic_name = "test-topic"
+
+ with pytest.raises(RuntimeError):
+ await iggy_client.create_topic(
+ stream=nonexistent_stream,
+ name=topic_name,
+ partitions_count=1
+ )
\ No newline at end of file