[ https://issues.apache.org/jira/browse/BEAM-5497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17137579#comment-17137579 ]
Beam JIRA Bot commented on BEAM-5497: ------------------------------------- This issue was marked "stale-P2" and has not received a public comment in 14 days. It is now automatically moved to P3. If you are still affected by it, you can comment and move it back to P2. > Provide support for Gevent > -------------------------- > > Key: BEAM-5497 > URL: https://issues.apache.org/jira/browse/BEAM-5497 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness > Reporter: Rakesh Kumar > Priority: P3 > > [Gevent|http://www.gevent.org/] is basically used to make parallel network > calls. We are using gevent in one of the transformation methods to call > internal services. Before using the gevent we also patch it as mentioned > [here|https://github.com/grpc/grpc/issues/4629#issuecomment-376962677]. > The transformation method is making multiple network call in parallel. Here > is the code snippet: > {code} > /__init__.py > import gevent.monkey > gevent.monkey.patch_all() > /transform.py > from gevent import Greenlet > from gevent import joinall > def filter_out_invalid_users(events): > key, user_id_data_pairs = events > user_ids = [user_id for user_id, data in user_id_data_pairs] > jobs = [] > id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE) > for id_chunk in id_chunks: > jobs.append(Greenlet.spawn(_call_users_service, # _call_user_service_ > method is making the network call. > list(id_chunk))) > """ > Here we increase the timeout based on the number of greenlets we are > running, to account for yielding > among greenlets > """ > join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1 > joinall(jobs, timeout=join_timeout) > successful_jobs = [job for job in jobs if job.successful()] > valid_user_ids = [] > for job in successful_jobs: > network_response = job.get() > valid_user_ids.append(network_response.user_id) > yield valid_user_ids > def _call_users_service(user_ids): > # make network call and return response > .. > .. > return network_response > {code} > This allows pipelines to start and partially run. However the tasks produce a > stream of gevent exceptions and do not make any progress.: > {code} > > Exception greenlet.error: error('cannot switch to a different thread',) in > 'grpc._cython.cygrpc.run_loop' ignored > Traceback (most recent call last): > File "src/gevent/event.py", line 240, in gevent._event.Event.wait > File "src/gevent/event.py", line 140, in > gevent._event._AbstractLinkable._wait > File "src/gevent/event.py", line 117, in > gevent._event._AbstractLinkable._wait_core > File "src/gevent/event.py", line 119, in > gevent._event._AbstractLinkable._wait_core > File "src/gevent/_greenlet_primitives.py", line 59, in > gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch > File "src/gevent/_greenlet_primitives.py", line 59, in > gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch > File "src/gevent/_greenlet_primitives.py", line 63, in > gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch > File "src/gevent/__greenlet_primitives.pxd", line 35, in > gevent.__greenlet_primitives._greenlet_switch > greenlet.error: cannot switch to a different thread > {code} > The alternative approach is to use multiprocess module as shown > [here|https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117] > & > [here|https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313]. > Gevent is lightweight and good for parallelizing IO/Network bound jobs and it > can efficiently use resources and it can scale well in case heavy load. > -- This message was sent by Atlassian Jira (v8.3.4#803005)