gromsterus commented on issue #52: URL: https://github.com/apache/pulsar-client-python/issues/52#issuecomment-1331889157
Hi, @whisust. Gunicorn fork main process on worker init, because of this, your approach won't work :( You could use [Flask Extensions Pattern](https://flask.palletsprojects.com/en/2.2.x/extensiondev/) & [Gunicorn Server Hooks](https://docs.gunicorn.org/en/stable/settings.html#server-hooks) together ```python # test_pulsar.py import logging import uuid from datetime import datetime from typing import Optional from _pulsar import Result from flask import Flask, make_response from pulsar import Client, MessageId, Producer, schema logging.basicConfig( level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s' ) logger = logging.getLogger(__name__) pulsar_logger = logging.getLogger('pulsar') class PulsarExt: def __init__(self, url: str) -> None: self._url = url self._client: Optional[Client] = None self._g_producer: Optional[Producer] = None def init_app(self, app_: Flask) -> None: app_.extensions['pulsar'] = self @property def client(self) -> Client: assert self._client is not None, 'Call `connect()` first' return self._client @property def g_producer(self) -> Producer: assert self._g_producer is not None, 'Call `connect()` first' return self._g_producer def connect(self) -> None: self._client = Client(self._url, authentication=None, logger=pulsar_logger) self._g_producer = self._client.create_producer( 'non-persistent://public/default/test-gunicorn', producer_name=f'my-producer-{uuid.uuid4()}', schema=schema.StringSchema(), ) def close(self) -> None: if self._g_producer: self._g_producer.close() if self._client: self._client.close() self._g_producer = None self._client = None app = Flask(__name__) pulsar_ext = PulsarExt('pulsar://pulsar:6650') pulsar_ext.init_app(app) def init_app() -> None: pulsar_ext.connect() def teardown_app() -> None: pulsar_ext.close() @app.post('/post-message') def post_pulsar_message(): logger.info( 'Calling producer.send_async now, ' 'in the next lines there should be the callback result' ) dt = datetime.now() pulsar_ext.g_producer.send_async(content=f'dt={dt.isoformat()}', callback=callback) logger.info('After producer.send_async, returning the http response') return '', 201 def callback(res: Result, _msg_id: MessageId): logger.info(f'Callback result here! Event acknowledged by the broker.') @app.get('/') def healthcheck(): logger.info('API Running fine') return make_response({'status': 'healthy'}) # gunicorn_conf.py from case_fixed import init_app, teardown_app def post_worker_init(worker): init_app() def worker_int(worker): teardown_app() def worker_abort(worker): teardown_app() ``` And start webserver with ```sh gunicorn test_pulsar:app --config gunicorn_conf.py --workers=2 --preload ``` Now client and producer are created after the fork and init of a worker -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
