Hi Ahmet,

I filed the jira ticket https://issues.apache.org/jira/browse/BEAM-5497.
Let me know if you need anything else from us.

Thank you,
Rakesh

On Mon, Sep 24, 2018 at 5:08 PM Ahmet Altay <al...@google.com> wrote:

> On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <rakeshku...@lyft.com>
> wrote:
>
>> Thanks Ahmet for providing a reference code. I will give it a try.
>>
>> I also tried to read the code it feels like you are using the
>> multiprocess for parallelizing runtime jobs. We wanted to use Gevent
>> because it is lightweight and good for parallelizing IO/Network bound jobs.
>>
>
> We are using this code for IO bound operation. For example [1], here it is
> used to make calls into GCS in parallel with batches of files.
>
> [1]
> https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313
>
>
>> I would also recommend providing Gevent support in the future because it
>> can efficiently use resources and it can scale well in heavy load.
>>
>
> Do you mind filing a JIRA for the gevent issue so that we can keep track
> of it?
>
>
>>
>>
>>
>> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote:
>>
>>> Thank you for the example it helps.
>>>
>>> I still do not know what is wrong with gevent. Would you consider using
>>> multiprocessing package? We are already using that to accomplish something
>>> similar in file based sinks, and there is already utility function that
>>> wraps it around similar to your example [1].
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117
>>>
>>> Ahmet
>>>
>>>
>>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <rakeshku...@lyft.com>
>>> wrote:
>>>
>>>>
>>>> Gevent <http://www.gevent.org/> is basically used to make parallel
>>>> network calls. We are using gevent in one of the transformation methods to
>>>> call internal services. The transformation method is making multiple
>>>> network call in parallel. Here is the code snippet:
>>>> /__init__.py
>>>> import gevent.monkey
>>>> gevent.monkey.patch_all()
>>>>
>>>> /transform.py
>>>> from gevent import Greenlet
>>>> from gevent import joinall
>>>> def filter_out_invalid_users(events):
>>>>    key, user_id_data_pairs = events
>>>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>>>
>>>>    jobs = []
>>>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>>>    for id_chunk in id_chunks:
>>>>       jobs.append(Greenlet.spawn(_call_users_service, #
>>>> _call_user_service_ method is making the network call.
>>>>                                  list(id_chunk)))
>>>>
>>>>    """
>>>>    Here we increase the timeout based on the number of greenlets we
>>>> are running, to account for yielding
>>>>    among greenlets
>>>>    """
>>>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>>>>    joinall(jobs, timeout=join_timeout)
>>>>
>>>>    successful_jobs = [job for job in jobs if job.successful()]
>>>>    valid_user_ids = []
>>>>    for job in successful_jobs:
>>>>       network_response = job.get()
>>>>       valid_user_ids.append(network_response.user_id)
>>>>    yield valid_user_ids
>>>>
>>>> def _call_users_service(user_ids):
>>>>    # make network call and return response
>>>>    ..
>>>>    ..
>>>>    return network_response
>>>>
>>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>>>>
>>>>> I am also not familiar with gevent. Could you explain what are you
>>>>> trying to do and how do you plan to use gevent?
>>>>>
>>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I don't think anyone has tried what your doing. The code that your
>>>>>> working with is very new.
>>>>>>
>>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mwy...@lyft.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> We're using the Python SDK with the portable Flink runner and
>>>>>>> running into some problems integrating gevent. We're patching the gRPC
>>>>>>> runtime for gevent as described in [0] which allows pipelines to start 
>>>>>>> and
>>>>>>> partially run. However the tasks produce a stream of gevent exceptions:
>>>>>>>
>>>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>>>   File "src/gevent/event.py", line 140, in
>>>>>>> gevent._event._AbstractLinkable._wait
>>>>>>>   File "src/gevent/event.py", line 117, in
>>>>>>> gevent._event._AbstractLinkable._wait_core
>>>>>>>   File "src/gevent/event.py", line 119, in
>>>>>>> gevent._event._AbstractLinkable._wait_core
>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>>>> greenlet.error: cannot switch to a different thread
>>>>>>>
>>>>>>> and do not make any progress.
>>>>>>>
>>>>>>> Has anybody else successfully used the portable python sdk with
>>>>>>> gevent? Or is there a recommended alternative for doing async IO in 
>>>>>>> python
>>>>>>> pipelines?
>>>>>>>
>>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>>>
>>>>>>> Micah
>>>>>>>
>>>>>>
>>>>> --
>>>> Rakesh Kumar
>>>> Software Engineer
>>>> 510-761-1364 <(510)%20761-1364> |
>>>>
>>>> <https://www.lyft.com/>
>>>>
>>>
>>> --
>> Rakesh Kumar
>> Software Engineer
>> 510-761-1364 <(510)%20761-1364> |
>>
>> <https://www.lyft.com/>
>>
> --
Rakesh Kumar
Software Engineer
510-761-1364 |

<https://www.lyft.com/>

Reply via email to