Hi Ahmet, I filed the jira ticket https://issues.apache.org/jira/browse/BEAM-5497. Let me know if you need anything else from us.
Thank you, Rakesh On Mon, Sep 24, 2018 at 5:08 PM Ahmet Altay <al...@google.com> wrote: > On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <rakeshku...@lyft.com> > wrote: > >> Thanks Ahmet for providing a reference code. I will give it a try. >> >> I also tried to read the code it feels like you are using the >> multiprocess for parallelizing runtime jobs. We wanted to use Gevent >> because it is lightweight and good for parallelizing IO/Network bound jobs. >> > > We are using this code for IO bound operation. For example [1], here it is > used to make calls into GCS in parallel with batches of files. > > [1] > https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313 > > >> I would also recommend providing Gevent support in the future because it >> can efficiently use resources and it can scale well in heavy load. >> > > Do you mind filing a JIRA for the gevent issue so that we can keep track > of it? > > >> >> >> >> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote: >> >>> Thank you for the example it helps. >>> >>> I still do not know what is wrong with gevent. Would you consider using >>> multiprocessing package? We are already using that to accomplish something >>> similar in file based sinks, and there is already utility function that >>> wraps it around similar to your example [1]. >>> >>> [1] >>> https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117 >>> >>> Ahmet >>> >>> >>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <rakeshku...@lyft.com> >>> wrote: >>> >>>> >>>> Gevent <http://www.gevent.org/> is basically used to make parallel >>>> network calls. We are using gevent in one of the transformation methods to >>>> call internal services. The transformation method is making multiple >>>> network call in parallel. Here is the code snippet: >>>> /__init__.py >>>> import gevent.monkey >>>> gevent.monkey.patch_all() >>>> >>>> /transform.py >>>> from gevent import Greenlet >>>> from gevent import joinall >>>> def filter_out_invalid_users(events): >>>> key, user_id_data_pairs = events >>>> user_ids = [user_id for user_id, data in user_id_data_pairs] >>>> >>>> jobs = [] >>>> id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE) >>>> for id_chunk in id_chunks: >>>> jobs.append(Greenlet.spawn(_call_users_service, # >>>> _call_user_service_ method is making the network call. >>>> list(id_chunk))) >>>> >>>> """ >>>> Here we increase the timeout based on the number of greenlets we >>>> are running, to account for yielding >>>> among greenlets >>>> """ >>>> join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1 >>>> joinall(jobs, timeout=join_timeout) >>>> >>>> successful_jobs = [job for job in jobs if job.successful()] >>>> valid_user_ids = [] >>>> for job in successful_jobs: >>>> network_response = job.get() >>>> valid_user_ids.append(network_response.user_id) >>>> yield valid_user_ids >>>> >>>> def _call_users_service(user_ids): >>>> # make network call and return response >>>> .. >>>> .. >>>> return network_response >>>> >>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote: >>>> >>>>> I am also not familiar with gevent. Could you explain what are you >>>>> trying to do and how do you plan to use gevent? >>>>> >>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> I don't think anyone has tried what your doing. The code that your >>>>>> working with is very new. >>>>>> >>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mwy...@lyft.com> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> We're using the Python SDK with the portable Flink runner and >>>>>>> running into some problems integrating gevent. We're patching the gRPC >>>>>>> runtime for gevent as described in [0] which allows pipelines to start >>>>>>> and >>>>>>> partially run. However the tasks produce a stream of gevent exceptions: >>>>>>> >>>>>>> Exception greenlet.error: error('cannot switch to a different >>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored >>>>>>> Traceback (most recent call last): >>>>>>> File "src/gevent/event.py", line 240, in gevent._event.Event.wait >>>>>>> File "src/gevent/event.py", line 140, in >>>>>>> gevent._event._AbstractLinkable._wait >>>>>>> File "src/gevent/event.py", line 117, in >>>>>>> gevent._event._AbstractLinkable._wait_core >>>>>>> File "src/gevent/event.py", line 119, in >>>>>>> gevent._event._AbstractLinkable._wait_core >>>>>>> File "src/gevent/_greenlet_primitives.py", line 59, in >>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>>> File "src/gevent/_greenlet_primitives.py", line 59, in >>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>>> File "src/gevent/_greenlet_primitives.py", line 63, in >>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>>> File "src/gevent/__greenlet_primitives.pxd", line 35, in >>>>>>> gevent.__greenlet_primitives._greenlet_switch >>>>>>> greenlet.error: cannot switch to a different thread >>>>>>> >>>>>>> and do not make any progress. >>>>>>> >>>>>>> Has anybody else successfully used the portable python sdk with >>>>>>> gevent? Or is there a recommended alternative for doing async IO in >>>>>>> python >>>>>>> pipelines? >>>>>>> >>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677 >>>>>>> >>>>>>> Micah >>>>>>> >>>>>> >>>>> -- >>>> Rakesh Kumar >>>> Software Engineer >>>> 510-761-1364 <(510)%20761-1364> | >>>> >>>> <https://www.lyft.com/> >>>> >>> >>> -- >> Rakesh Kumar >> Software Engineer >> 510-761-1364 <(510)%20761-1364> | >> >> <https://www.lyft.com/> >> > -- Rakesh Kumar Software Engineer 510-761-1364 | <https://www.lyft.com/>