There are definitely some very important rules around the use of threads
when using the async grpc/netty stuff. I don't remember them offhand. If I
recall, the default behavior for netty is all work associated with a
connection is done on the same thread. I think the code is called
HashedWheelTimer.

Note, this all goes back to when I wrote the original code for backpressure
and the associated tests so memory has definitely faded.

On Fri, Mar 11, 2022, 6:20 PM James Duong <[email protected]> wrote:

> This was patching the application code (the FlightProducer implementation).
>
> I wouldn't say this happened under heavy load, but would consistently
> happen with queries that returned enough data in one chunk that a client
> would need to report being busy on.
>
> The behavior you're describing seems to indicate that the handlers should
> run on any thread available to gRPC, but my observation was that the
> handler would run on thread that getStream was called on. I haven't looked
> through to see if that is the case though.
>
> On Fri., Mar. 11, 2022, 17:47 David Li, <[email protected]> wrote:
>
>> Hmm, you mean patching Flight itself? Or the application code? (Sounds
>> like the latter?)
>>
>> Just curious - were you seeing this hanging only under load, and with a
>> fixed thread pool configured on the gRPC server? There is a pitfall there
>> due to how gRPC is implemented (certain internal callbacks, including the
>> one that sets the isCancelled flag IIRC, are run on the same thread pool as
>> RPC handlers, so if 1) your RPC handlers are synchronous, 2) your thread
>> pool has fixed capacity, and 3) all threads are servicing a call, then they
>> can get stuck forever because the gRPC internal callbacks can never run).
>>
>> On Fri, Mar 11, 2022, at 20:11, James Duong wrote:
>>
>> Following up on this, we were able to get by the hanging issue by
>> changing the FlightProducer getStream() implementation to send its work
>> (including the blocking on CallbackBackpressureStrategy) to a background
>> thread instead of the gRPC thread running it.
>>
>> On Tue, Mar 8, 2022 at 4:26 PM James Duong <[email protected]>
>> wrote:
>>
>> We see backpressure related timeouts. I'm thinking there's an issue with
>> CallbackBackpressureStrategy relying on ServerStreamListener#isReady().
>> I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.
>>
>> We're going to try a fix for this locally then if it helps create a PR.
>>
>> On Mon, Mar 7, 2022 at 4:19 PM David Li <[email protected]> wrote:
>>
>>
>> So you're finding that if you remove the backpressure handler, there are
>> no problems?
>>
>> Is the timeout a gRPC timeout? Do you know if any messages are making it
>> through, or is it timing out after a period of no activity at all?
>>
>> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
>>
>> Hi David,
>>
>> We believe we have a data race somewhere as debugging the code above
>> causes no issues but running it without the debugger causes a timeout. We
>> were trying to investigate if putNext kept around a reference to the data
>> in the VectorSchemaRoot. Given that we are getting a timeout with the
>> backpressure we think it is possible the code
>> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
>> may be the culprit.
>>
>> On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote:
>>
>>
>> It should be safe. Are you seeing any issues?
>>
>> Flight waits for an explicit next()/putNext() to actually touch anything.
>> And once they return, Flight will not read or mutate your data. So for
>> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
>> which you can reuse the Arrow buffers. (This is *not* true if you have
>> zero-copy writes enabled. In that case we poke a reference to the Arrow
>> buffers into the gRPC data structure and so mutating your Arrow buffer will
>> mysteriously change "previously written" data.)
>>
>> It's been years since I've touched this, though, so the details here are
>> fuzzy to me...
>>
>> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>>
>> Hi all,
>>
>> Related to this issue and solution below we were wondering if it is safe
>> to call VectorLoader.load() before checking if a client is ready when using
>> back pressure strategy. The thinking is that the client may still be
>> reading data from the root and calling load() may cause a data race.
>>
>> *public* void getStream(CallContext context, ServerStreamListener listener) {
>>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>>
>>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>>
>>       *final* VectorSchemaRoot root = *new* 
>> VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), 
>> clientRoot.getRowCount());
>>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>>       listener.start(root, flightStream.getDictionaryProvider());
>>
>>       *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) {
>>           *break*;
>>         }
>>
>>         *if* (flightStream.getRoot() != clientRoot) {
>>           clientRoot = flightStream.getRoot();
>>           vectorUnloader = *new* VectorUnloader(clientRoot);
>>         }
>>
>>         // is this safe to happen before the client is ready?
>>         vectorLoader.load(vectorUnloader.getRecordBatch());
>>
>>         // this uses the build in CallBackpressureStrategy
>>         *final* BackpressureStrategy.WaitResult waitResult = 
>> backpressureStrategy.waitForListener(timeout);
>>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>>           listener.putNext();
>>         }
>>       }
>>
>>       listener.completed();}
>>
>> Let me know what you think.
>>
>> Sincerely,
>>
>> Alex McRae
>>
>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]>
>> wrote:
>>
>> Absolutely!
>>
>> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote:
>>
>>
>> Just to make sure this doesn't get forgotten: I filed
>> https://github.com/apache/arrow-cookbook/issues/158 for providing an
>> example of this.
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>
>> It should be safe. The fast-read path has pretty much always been enabled
>> and I'm not aware of issues it causes. (The fast-read path simply calls an
>> internal gRPC method to avoid bouncing through byte[], but we're still
>> copying the data into Arrow, now that I look at it.)
>>
>> The fast-write path is not relevant here, but that's the one that is
>> trickier to use. We should make sure the optimization(s) are properly
>> documented, since looking through it's not really explained what the
>> consequences are (or at least the flag in ArrowMessage should reference
>> setUseZeroCopy, and we should have a doc page for these env vars analogous
>> to ARROW-15617 for C++.)
>>
>> On a side note, digging around to refresh my memory shows that gRPC Java
>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
>> sure it's quite relevant for us, since we still need to get the data into
>> an off-heap buffer in the end, but I need to take a closer look. (See
>> grpc/grpc-java#8102.)
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>
>> Thanks for the tip David.
>>
>> Do you know if zero copy can be used safely on the ServerStreamListener
>> when using the VectorUnloader/Loader pattern above?
>>
>> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote:
>>
>>
>> Hey Alex,
>>
>> Basically, you should call start() exactly once, as you noticed, it sends
>> the initial schema message.
>>
>> If the VectorSchemaRoot is not stable, what you should do is create your
>> own root with the same schema, and use VectorUnloader/VectorLoader to
>> transfer data from the source root to the root used by Flight.
>>
>> Does that make sense? This would be good to add to the Arrow Java
>> cookbook (at least, the VectorLoader/Unloader part).
>>
>> -David
>>
>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>
>> Hi team,
>>
>> We are currently building a Flight service which proxies requests in
>> Java. We are currently getting getStream working on the FlightProducer.
>>
>> The code looks similar to this
>>
>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>     FlightStream stream = *this*.client.getStream(ticket);
>>     *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) { *break*; }
>>
>>         listener.start(flightStream.getRoot(), 
>> flightStream.getDictionaryProvider());
>>         listener.putNext();
>>     }
>>
>>     listener.completed();
>>
>> }
>>
>>
>> We are running into issues understanding if this is valid usage? I have
>> looked at the OutBoundStreamListenerImpl.java file and it looks like
>> calling start() on the listener causes it to resend some schema messages.
>> We are trying to understand how to handle the case where
>> flightStream.getRoot() returns a different VectorSchemaRoot than the
>> previous call.
>>
>> For more context we have also tried
>>
>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>     listener.start(flightStream.getRoot(), 
>> flightStream.getDictionaryProvider());
>>     *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) { *break*; }
>>
>>         listener.putNext();
>>     }
>>     listener.completed();}
>>
>> But ran into issues with the connection not closing, we believe this to
>> be due to the VectorSchemaRoot changing on flightStream.next() calls. We
>> believe this is a issue because we are sharing the root with both the
>> FlightStream and ServerStreamListener.
>> https://github.com/dremio-hub/arrow-flight-client-examples is the client
>> we are using to test this end to end.
>>
>> Please let me know if you can provide any clarity, I would be happy to
>> update the documentation afterwards.
>>
>> Sincerely,
>> Alex McRae
>> [email protected]
>>
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | [email protected]
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>
>>
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | [email protected]
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | [email protected]
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>

Reply via email to