[grpc-io] Re: C++: AsyncWrite constraint on completion queue

2023-05-16 Thread 'yas...@google.com' via grpc.io
I'll preface this by saying - Use the C++ callback API. Instead of trying 
to understand the Async CQ-based API, the callback API should be the choice 
and is our current recommendation.

>  Only one write is permissible per stream. So we cannot write another tag 
on a stream until we receive a response tag from the completion queue for 
the previous write.

This is correct.

I'll end this by again saying - Use the C++ callback API.

> Recently,  I came across an issue where the gRPC client became a zombie 
process as its parent Python application was aborted. In this condition, 
the previous Write done on the stream connected with the client did not get 
ack, probably,  and I did not receive the Write tag back in the completion 
queue for that Write. My program kept waiting for the write tag and other 
messages continued to queue up as the previous Write did not finish its 
life cycle and hence I could not free the resources also for that tag.

This can be easily avoided by configuring keepalive. Refer -
1) https://github.com/grpc/grpc/blob/master/doc/keepalive.md
2) https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md
3) https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md

That also answers your question on what happens if for some reason, a 
client stops reading. Keepalive would handle it.

> My question is, if a write tag for a previous write does not surface on 
the completion queue, shall we wait for it indefinitely? What should be the 
strategy to handle this scenario?
Depends highly on your API/service. If for some reason, the RPC is taking 
much longer than you want and you are suspecting that the client is being 
problematic (i.e. responding to http keepalives but not making progress on 
RPCs), you could always just end the RPC.

On Wednesday, May 10, 2023 at 12:17:46 AM UTC-7 Ashutosh Maheshwari wrote:

> Hello,
>
> My question is, if a write tag for a previous write does not surface on 
> the completion queue, shall we wait for it indefinitely? What should be the 
> strategy to handle this scenario?
>
> Regards
> Ashutosh
> On Wednesday, April 26, 2023 at 11:11:57 PM UTC+5:30 apo...@google.com 
> wrote:
>
>> First, it's important to clarify what it means to wait for a "Write" tag 
>> to complete on a completion queue:
>>
>> When async "Write" is initially attempted, the message can be fully or 
>> partially buffered within gRPC. The corresponding tag will surface on the 
>> completion queue that the Write is associated with essentially after gRPC 
>> is done buffering the message, i.e. after it's written out relevant bytes 
>> to the wire.
>>
>> This is unrelated to whether or not a "response" has been received from 
>> the peer, on the same stream.
>>
>> So, the highlighted comment means that you can only have one async write 
>> "pending" per RPC, at any given time. I.e. in order to start a new write on 
>> a streaming RPC, one must wait for the previous write on that same stream 
>> to "complete" (i.e. for it's tag to be surfaced).
>>
>> Multiple pending writes on different RPCs of the same completion queue 
>> are fine.
>> On Saturday, April 22, 2023 at 12:58:57 PM UTC-7 Ashutosh Maheshwari 
>> wrote:
>>
>>> Hello gRPC Team,
>>>
>>> I have taken an extract from 
>>> *“include/grpcpp/impl/codegen/async_stream.h”*
>>>
>>>  *“*
>>>
>>>   /// Request the writing of \a msg with identifying tag \a tag.
>>>
>>>   ///
>>>
>>>   /// Only one write may be outstanding at any given time. This means 
>>> that
>>>
>>>   /// after calling Write, one must wait to receive \a tag from the 
>>> completion
>>>
>>>   /// queue BEFORE calling Write again.
>>>
>>>   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
>>>
>>>   ///
>>>
>>>   /// gRPC doesn't take ownership or a reference to \a msg, so it is 
>>> safe to
>>>
>>>   /// to deallocate once Write returns.
>>>
>>>   ///
>>>
>>>   /// \param[in] msg The message to be written.
>>>
>>>   /// \param[in] tag The tag identifying the operation.
>>>
>>>   virtual void Write(const W& msg, void* tag) = 0;
>>>
>>> “
>>>
>>>  After reading the highlighted part,  I can make the following two 
>>> inferences:
>>>
>>>1. Only one write is permissible per stream. So we cannot write 
>>>another tag on a stream until we receive a response tag from the 
>>> completion 
>>>queue for the previous write. 
>>>2. Only one write is permissible on the completion queue with no 
>>>dependency on available streams. When multiple clients connect to the 
>>> grpc 
>>>server, then we will have multiple streams present. Now in such a 
>>> scenario, 
>>>only one client can be responded to at a time due to the 
>>> above-highlighted 
>>>limitation. 
>>>
>>>  Can you please help us in understanding which one of our above 
>>> inferences is true?
>>>
>>> Recently,  I came across an issue where the gRPC client became a zombie 
>>> process as its parent Python application was aborted. In this condition, 

Re: [grpc-io] grpc executor threads

2023-05-16 Thread 'AJ Heller' via grpc.io
Hello all, I want to offer a quick update. tl;dr: Jeff's analysis is 
correct. The executor is legacy code at this point, slated for deletion, 
and increasingly unused.

We have been carefully replacing the legacy I/O, timer, and async execution 
implementations with a new public EventEngine 

 API 
and its default implementations. The new thread pools do still auto-scale 
as needed - albeit with different heuristics, which are evolving as we 
benchmark - but threads are now reclaimed if/when gRPC calms down from a 
burst of activity that caused the pool to grow. Also, I believe the 
executor did not rate limit thread creation when closure queues reached 
their max depths, but the default EventEngine implementations do rate limit 
thread creation (currently capped at 1 new thread per second, but that's an 
implementation detail which may change ... some benchmarks have shown it to 
be a pretty effective rate). Beginning around gRPC v.148, you should see an 
increasing number of "event_engine" threads, and a decreasing number of 
executor threads. Ultimately we aim to unify all async activity into a 
single auto-scaling thread pool under the EventEngine.

And since the EventEngine is a public API, any integrators that want 
complete control over thread behavior can implement their own EventEngine 
and plug it in to gRPC. gRPC will (eventually) use a provided engine for 
all async execution, timers, and I/O. Implementing an engine is not a small 
task, but it is an option people have been requesting for years. Otherwise, 
the default threading behavior provided by gRPC is tuned for performance - 
if starting a thread helps gRPC move faster, then that's what it will do.

Hope this helps!
-aj

On Friday, May 12, 2023 at 4:03:58 PM UTC-7 Jiqing Tang wrote:

> Thanks so much Jeff, agree reaping them after they being idle would be 
> great.
>
> On Friday, May 12, 2023 at 6:59:28 PM UTC-4 Jeff Steger wrote:
>
>> This is as close to an explanation as I have found:
>>
>> look at sreecha’s response in
>> https://github.com/grpc/grpc/issues/14578
>>
>> tldr: 
>> “ The max number of threads can be 2x 
>> 
>>  the 
>> number cores and unfortunately its not configurable at the moment….. any 
>> executor threads and timer-manager you see are by-design; unless the 
>> threads are more than 2x the number of cores on your machine in which case 
>> it is clearly a bug”
>>
>>
>> From my observation of the thread count and from my examination of the 
>> grpc code (which I admit I performed some years ago), it is evident to me 
>> that the grpc framework spawns threads up to 2x the number of hardware 
>> cores. It will spawn a new thread if an existing thread in its threadpool 
>> is busy iirc. The issue is that the grpc framework never reaps idle 
>> threads. Once a thread is created, it is there for the lifetime if the grpc 
>> server. There is no way to configure the max number of threads either. It 
>> is really imo a sloppy design. threads aren’t free and this framework keeps 
>> (in my case) dozens and dozens of idle threads around even during long 
>> periods of low or no traffic. Maybe they fixed it in newer versions, idk. 
>>
>> On Fri, May 12, 2023 at 5:58 PM Jiqing Tang  wrote:
>>
>>> Hi Jeff and Mark,
>>>
>>> I just ran into the same issue with an async c++ GRPC server (version 
>>> 1.37.1), was curious about these default-executo threads and then got this 
>>> thread, did you guys figure out what these threads are for? The number 
>>> seems to be about 2x of the polling worker threads.
>>>
>>> Thanks!
>>>
>>> On Friday, January 7, 2022 at 3:47:51 PM UTC-5 Jeff Steger wrote:
>>>
 Thanks Mark, I will turn on trace and see if I see anything odd. I was 
 reading about a function called Executor::SetThreadingDefault(bool enable) 
 that I think I can safely call after i create my grpc server. It is a 
 public function and seems to allow me to toggle between a threaded 
 implementation and an async one. Is that accurate? Is calling this 
 function 
 safe to do and/or recommended (or at least not contra-recommended). Thanks 
 again for your help!

 Jeff



 On Fri, Jan 7, 2022 at 11:14 AM Mark D. Roth  wrote:

> Oh, sorry, I thought you were asking about the sync server threads.  
> The default-executor threads sound like threads that are spawned 
> internally 
> inside of C-core for things like synchronous DNS resolution; those should 
> be completely unrelated to the sync server threads.  I'm not sure what 
> would cause those threads to pile up.
>
> Try running with the env vars GRPC_VERBOSITY=DEBUG GRPC_TRACE=executor 
> and see if that yields any useful log information.  In particular, try 
> running that with a debug build, sinc

[grpc-io] {method 'next_event' of 'grpc._cython.cygrpc.SegregatedCall' objects} taking >6secs on MacOS

2023-05-16 Thread Adam Walters
Hello,

Recently my program started running extremely slow and upon further 
investigation found that this '{method 'next_event' of 
'grpc._cython.cygrpc.SegregatedCall' objects}' was taking over 5 seconds 
per call.

I am using all of the latest versions grpc and google-ads packages.

Below is an example of the code that is running and the results from a 
cProfile run:
from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.v13.services.services.google_ads_service import 
pagers
import logging

logging.basicConfig(level=logging.INFO, format='[%(asctime)s - 
%(levelname)s] %(message).5000s')
logging.getLogger('google.ads.googleads.client').setLevel(logging.INFO)

client = GoogleAdsClient.load_from_storage("google-ads.yaml")
google_ads_service = client.get_service("GoogleAdsService", version='v13')

class GoogleQuery:
"""A class to query Google Ads API

Attributes:
client: An initialized GoogleAdsClient instance.
mcc_id: The Google Ads MCC ID
search_accounts_query: A query to return all search accounts
display_accounts_query: A query to return all display accounts

Methods:
get_existing_search_accounts: Returns a dictionary of existing search 
accounts
get_existing_display_accounts: Returns a dictionary of existing display 
accounts
run_gaql_query: Runs a GAQL query and returns a Pager object
"""

def __init__(self):
self.client = client
self.mcc_id = "XX"
self.search_accounts_query = """SELECT
customer_client.id,
customer_client.resource_name,
customer_client.descriptive_name,
customer_client.manager,
customer_client.applied_labels
FROM 
customer_client
WHERE
customer_client.manager = false 
AND customer_client.id IS NOT NULL 
AND customer_client.descriptive_name LIKE '%Search%'
AND customer.status = 'ENABLED'"""

@staticmethod
def run_gaql_query(query: str, customer_id) -> pagers.SearchPager:
"""runs GAQL query through Google Ads API
Args:
customer_id: customer ID being queried
query: the actual GAQL query being ran
Returns:
Returns a pagers.SearchPager response of the raw data
"""
return google_ads_service.search(
customer_id=customer_id,
query=query
)

def get_existing_search_accounts(self) -> dict:
"""runs GAQL query through Google Ads API

Returns:
Returns a dict of {internal_id, account_name}
"""
response = self.run_gaql_query(self.search_accounts_query, self.mcc_id)
return {row.customer_client.descriptive_name[-8:]: 
row.customer_client.descriptive_name for row in response}

gaql = GoogleQuery()
gaql.get_existing_search_accounts()

# cProfile Results from above code Tue May 16 13:31:28 2023   
 output_test.pstats

 991213 function calls (973122 primitive calls) in 10.099 seconds

   Ordered by: internal time
   List reduced from 6168 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
17.3987.3987.4017.401 {method 'next_event' of 
'grpc._cython.cygrpc.SegregatedCall' objects}

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/00fe0078-d9f0-4234-adec-0bf8ccb930c7n%40googlegroups.com.