On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <rakeshku...@lyft.com> wrote:

> Thanks Ahmet for providing a reference code. I will give it a try.
>
> I also tried to read the code it feels like you are using the multiprocess
> for parallelizing runtime jobs. We wanted to use Gevent because it is
> lightweight and good for parallelizing IO/Network bound jobs.
>

We are using this code for IO bound operation. For example [1], here it is
used to make calls into GCS in parallel with batches of files.

[1]
https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313


> I would also recommend providing Gevent support in the future because it
> can efficiently use resources and it can scale well in heavy load.
>

Do you mind filing a JIRA for the gevent issue so that we can keep track of
it?


>
>
>
> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote:
>
>> Thank you for the example it helps.
>>
>> I still do not know what is wrong with gevent. Would you consider using
>> multiprocessing package? We are already using that to accomplish something
>> similar in file based sinks, and there is already utility function that
>> wraps it around similar to your example [1].
>>
>> [1] https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591
>> f860235708/sdks/python/apache_beam/internal/util.py#L117
>>
>> Ahmet
>>
>>
>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <rakeshku...@lyft.com>
>> wrote:
>>
>>>
>>> Gevent <http://www.gevent.org/> is basically used to make parallel
>>> network calls. We are using gevent in one of the transformation methods to
>>> call internal services. The transformation method is making multiple
>>> network call in parallel. Here is the code snippet:
>>> /__init__.py
>>> import gevent.monkey
>>> gevent.monkey.patch_all()
>>>
>>> /transform.py
>>> from gevent import Greenlet
>>> from gevent import joinall
>>> def filter_out_invalid_users(events):
>>>    key, user_id_data_pairs = events
>>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>>
>>>    jobs = []
>>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>>    for id_chunk in id_chunks:
>>>       jobs.append(Greenlet.spawn(_call_users_service, #
>>> _call_user_service_ method is making the network call.
>>>                                  list(id_chunk)))
>>>
>>>    """
>>>    Here we increase the timeout based on the number of greenlets we are
>>> running, to account for yielding
>>>    among greenlets
>>>    """
>>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>>>    joinall(jobs, timeout=join_timeout)
>>>
>>>    successful_jobs = [job for job in jobs if job.successful()]
>>>    valid_user_ids = []
>>>    for job in successful_jobs:
>>>       network_response = job.get()
>>>       valid_user_ids.append(network_response.user_id)
>>>    yield valid_user_ids
>>>
>>> def _call_users_service(user_ids):
>>>    # make network call and return response
>>>    ..
>>>    ..
>>>    return network_response
>>>
>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>> I am also not familiar with gevent. Could you explain what are you
>>>> trying to do and how do you plan to use gevent?
>>>>
>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I don't think anyone has tried what your doing. The code that your
>>>>> working with is very new.
>>>>>
>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mwy...@lyft.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> We're using the Python SDK with the portable Flink runner and running
>>>>>> into some problems integrating gevent. We're patching the gRPC runtime 
>>>>>> for
>>>>>> gevent as described in [0] which allows pipelines to start and partially
>>>>>> run. However the tasks produce a stream of gevent exceptions:
>>>>>>
>>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>>> Traceback (most recent call last):
>>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>>   File "src/gevent/event.py", line 140, in gevent._event._
>>>>>> AbstractLinkable._wait
>>>>>>   File "src/gevent/event.py", line 117, in gevent._event._
>>>>>> AbstractLinkable._wait_core
>>>>>>   File "src/gevent/event.py", line 119, in gevent._event._
>>>>>> AbstractLinkable._wait_core
>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>>> greenlet.error: cannot switch to a different thread
>>>>>>
>>>>>> and do not make any progress.
>>>>>>
>>>>>> Has anybody else successfully used the portable python sdk with
>>>>>> gevent? Or is there a recommended alternative for doing async IO in 
>>>>>> python
>>>>>> pipelines?
>>>>>>
>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>>
>>>>>> Micah
>>>>>>
>>>>>
>>>> --
>>> Rakesh Kumar
>>> Software Engineer
>>> 510-761-1364 <(510)%20761-1364> |
>>>
>>> <https://www.lyft.com/>
>>>
>>
>> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 |
>
> <https://www.lyft.com/>
>

Reply via email to