On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <rakeshku...@lyft.com> wrote:
> Thanks Ahmet for providing a reference code. I will give it a try. > > I also tried to read the code it feels like you are using the multiprocess > for parallelizing runtime jobs. We wanted to use Gevent because it is > lightweight and good for parallelizing IO/Network bound jobs. > We are using this code for IO bound operation. For example [1], here it is used to make calls into GCS in parallel with batches of files. [1] https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313 > I would also recommend providing Gevent support in the future because it > can efficiently use resources and it can scale well in heavy load. > Do you mind filing a JIRA for the gevent issue so that we can keep track of it? > > > > On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote: > >> Thank you for the example it helps. >> >> I still do not know what is wrong with gevent. Would you consider using >> multiprocessing package? We are already using that to accomplish something >> similar in file based sinks, and there is already utility function that >> wraps it around similar to your example [1]. >> >> [1] https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591 >> f860235708/sdks/python/apache_beam/internal/util.py#L117 >> >> Ahmet >> >> >> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <rakeshku...@lyft.com> >> wrote: >> >>> >>> Gevent <http://www.gevent.org/> is basically used to make parallel >>> network calls. We are using gevent in one of the transformation methods to >>> call internal services. The transformation method is making multiple >>> network call in parallel. Here is the code snippet: >>> /__init__.py >>> import gevent.monkey >>> gevent.monkey.patch_all() >>> >>> /transform.py >>> from gevent import Greenlet >>> from gevent import joinall >>> def filter_out_invalid_users(events): >>> key, user_id_data_pairs = events >>> user_ids = [user_id for user_id, data in user_id_data_pairs] >>> >>> jobs = [] >>> id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE) >>> for id_chunk in id_chunks: >>> jobs.append(Greenlet.spawn(_call_users_service, # >>> _call_user_service_ method is making the network call. >>> list(id_chunk))) >>> >>> """ >>> Here we increase the timeout based on the number of greenlets we are >>> running, to account for yielding >>> among greenlets >>> """ >>> join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1 >>> joinall(jobs, timeout=join_timeout) >>> >>> successful_jobs = [job for job in jobs if job.successful()] >>> valid_user_ids = [] >>> for job in successful_jobs: >>> network_response = job.get() >>> valid_user_ids.append(network_response.user_id) >>> yield valid_user_ids >>> >>> def _call_users_service(user_ids): >>> # make network call and return response >>> .. >>> .. >>> return network_response >>> >>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote: >>> >>>> I am also not familiar with gevent. Could you explain what are you >>>> trying to do and how do you plan to use gevent? >>>> >>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> I don't think anyone has tried what your doing. The code that your >>>>> working with is very new. >>>>> >>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mwy...@lyft.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> We're using the Python SDK with the portable Flink runner and running >>>>>> into some problems integrating gevent. We're patching the gRPC runtime >>>>>> for >>>>>> gevent as described in [0] which allows pipelines to start and partially >>>>>> run. However the tasks produce a stream of gevent exceptions: >>>>>> >>>>>> Exception greenlet.error: error('cannot switch to a different >>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored >>>>>> Traceback (most recent call last): >>>>>> File "src/gevent/event.py", line 240, in gevent._event.Event.wait >>>>>> File "src/gevent/event.py", line 140, in gevent._event._ >>>>>> AbstractLinkable._wait >>>>>> File "src/gevent/event.py", line 117, in gevent._event._ >>>>>> AbstractLinkable._wait_core >>>>>> File "src/gevent/event.py", line 119, in gevent._event._ >>>>>> AbstractLinkable._wait_core >>>>>> File "src/gevent/_greenlet_primitives.py", line 59, in >>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>> File "src/gevent/_greenlet_primitives.py", line 59, in >>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>> File "src/gevent/_greenlet_primitives.py", line 63, in >>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>> File "src/gevent/__greenlet_primitives.pxd", line 35, in >>>>>> gevent.__greenlet_primitives._greenlet_switch >>>>>> greenlet.error: cannot switch to a different thread >>>>>> >>>>>> and do not make any progress. >>>>>> >>>>>> Has anybody else successfully used the portable python sdk with >>>>>> gevent? Or is there a recommended alternative for doing async IO in >>>>>> python >>>>>> pipelines? >>>>>> >>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677 >>>>>> >>>>>> Micah >>>>>> >>>>> >>>> -- >>> Rakesh Kumar >>> Software Engineer >>> 510-761-1364 <(510)%20761-1364> | >>> >>> <https://www.lyft.com/> >>> >> >> -- > Rakesh Kumar > Software Engineer > 510-761-1364 | > > <https://www.lyft.com/> >