There are definitely some very important rules around the use of threads when using the async grpc/netty stuff. I don't remember them offhand. If I recall, the default behavior for netty is all work associated with a connection is done on the same thread. I think the code is called HashedWheelTimer.
Note, this all goes back to when I wrote the original code for backpressure and the associated tests so memory has definitely faded. On Fri, Mar 11, 2022, 6:20 PM James Duong <[email protected]> wrote: > This was patching the application code (the FlightProducer implementation). > > I wouldn't say this happened under heavy load, but would consistently > happen with queries that returned enough data in one chunk that a client > would need to report being busy on. > > The behavior you're describing seems to indicate that the handlers should > run on any thread available to gRPC, but my observation was that the > handler would run on thread that getStream was called on. I haven't looked > through to see if that is the case though. > > On Fri., Mar. 11, 2022, 17:47 David Li, <[email protected]> wrote: > >> Hmm, you mean patching Flight itself? Or the application code? (Sounds >> like the latter?) >> >> Just curious - were you seeing this hanging only under load, and with a >> fixed thread pool configured on the gRPC server? There is a pitfall there >> due to how gRPC is implemented (certain internal callbacks, including the >> one that sets the isCancelled flag IIRC, are run on the same thread pool as >> RPC handlers, so if 1) your RPC handlers are synchronous, 2) your thread >> pool has fixed capacity, and 3) all threads are servicing a call, then they >> can get stuck forever because the gRPC internal callbacks can never run). >> >> On Fri, Mar 11, 2022, at 20:11, James Duong wrote: >> >> Following up on this, we were able to get by the hanging issue by >> changing the FlightProducer getStream() implementation to send its work >> (including the blocking on CallbackBackpressureStrategy) to a background >> thread instead of the gRPC thread running it. >> >> On Tue, Mar 8, 2022 at 4:26 PM James Duong <[email protected]> >> wrote: >> >> We see backpressure related timeouts. I'm thinking there's an issue with >> CallbackBackpressureStrategy relying on ServerStreamListener#isReady(). >> I've created https://issues.apache.org/jira/browse/ARROW-15876 for this. >> >> We're going to try a fix for this locally then if it helps create a PR. >> >> On Mon, Mar 7, 2022 at 4:19 PM David Li <[email protected]> wrote: >> >> >> So you're finding that if you remove the backpressure handler, there are >> no problems? >> >> Is the timeout a gRPC timeout? Do you know if any messages are making it >> through, or is it timing out after a period of no activity at all? >> >> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote: >> >> Hi David, >> >> We believe we have a data race somewhere as debugging the code above >> causes no issues but running it without the debugger causes a timeout. We >> were trying to investigate if putNext kept around a reference to the data >> in the VectorSchemaRoot. Given that we are getting a timeout with the >> backpressure we think it is possible the code >> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java >> may be the culprit. >> >> On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote: >> >> >> It should be safe. Are you seeing any issues? >> >> Flight waits for an explicit next()/putNext() to actually touch anything. >> And once they return, Flight will not read or mutate your data. So for >> instance, calling putNext() copies the Arrow data into a gRPC buffer, after >> which you can reuse the Arrow buffers. (This is *not* true if you have >> zero-copy writes enabled. In that case we poke a reference to the Arrow >> buffers into the gRPC data structure and so mutating your Arrow buffer will >> mysteriously change "previously written" data.) >> >> It's been years since I've touched this, though, so the details here are >> fuzzy to me... >> >> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote: >> >> Hi all, >> >> Related to this issue and solution below we were wondering if it is safe >> to call VectorLoader.load() before checking if a client is ready when using >> back pressure strategy. The thinking is that the client may still be >> reading data from the root and calling load() may cause a data race. >> >> *public* void getStream(CallContext context, ServerStreamListener listener) { >> *final* FlightStream flightStream = *this*.client.getStream(ticket); >> >> VectorSchemaRoot clientRoot = flightStream.getRoot(); >> VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot); >> >> *final* VectorSchemaRoot root = *new* >> VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), >> clientRoot.getRowCount()); >> *final* VectorLoader vectorLoader = *new* VectorLoader(root); >> listener.start(root, flightStream.getDictionaryProvider()); >> >> *while* (flightStream.next()) { >> *if* (!flightStream.hasRoot()) { >> *break*; >> } >> >> *if* (flightStream.getRoot() != clientRoot) { >> clientRoot = flightStream.getRoot(); >> vectorUnloader = *new* VectorUnloader(clientRoot); >> } >> >> // is this safe to happen before the client is ready? >> vectorLoader.load(vectorUnloader.getRecordBatch()); >> >> // this uses the build in CallBackpressureStrategy >> *final* BackpressureStrategy.WaitResult waitResult = >> backpressureStrategy.waitForListener(timeout); >> *if* (waitResult == BackpressureStrategy.WaitResult.READY) { >> listener.putNext(); >> } >> } >> >> listener.completed();} >> >> Let me know what you think. >> >> Sincerely, >> >> Alex McRae >> >> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> >> wrote: >> >> Absolutely! >> >> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote: >> >> >> Just to make sure this doesn't get forgotten: I filed >> https://github.com/apache/arrow-cookbook/issues/158 for providing an >> example of this. >> >> -David >> >> On Tue, Feb 15, 2022, at 13:54, David Li wrote: >> >> It should be safe. The fast-read path has pretty much always been enabled >> and I'm not aware of issues it causes. (The fast-read path simply calls an >> internal gRPC method to avoid bouncing through byte[], but we're still >> copying the data into Arrow, now that I look at it.) >> >> The fast-write path is not relevant here, but that's the one that is >> trickier to use. We should make sure the optimization(s) are properly >> documented, since looking through it's not really explained what the >> consequences are (or at least the flag in ArrowMessage should reference >> setUseZeroCopy, and we should have a doc page for these env vars analogous >> to ARROW-15617 for C++.) >> >> On a side note, digging around to refresh my memory shows that gRPC Java >> *finally* introduced a zero-copy Protobuf deserialization path. I'm not >> sure it's quite relevant for us, since we still need to get the data into >> an off-heap buffer in the end, but I need to take a closer look. (See >> grpc/grpc-java#8102.) >> >> -David >> >> On Tue, Feb 15, 2022, at 13:12, James Duong wrote: >> >> Thanks for the tip David. >> >> Do you know if zero copy can be used safely on the ServerStreamListener >> when using the VectorUnloader/Loader pattern above? >> >> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote: >> >> >> Hey Alex, >> >> Basically, you should call start() exactly once, as you noticed, it sends >> the initial schema message. >> >> If the VectorSchemaRoot is not stable, what you should do is create your >> own root with the same schema, and use VectorUnloader/VectorLoader to >> transfer data from the source root to the root used by Flight. >> >> Does that make sense? This would be good to add to the Arrow Java >> cookbook (at least, the VectorLoader/Unloader part). >> >> -David >> >> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote: >> >> Hi team, >> >> We are currently building a Flight service which proxies requests in >> Java. We are currently getting getStream working on the FlightProducer. >> >> The code looks similar to this >> >> public *void* getStream(CallContext context, ServerStreamListener listener) { >> FlightStream stream = *this*.client.getStream(ticket); >> *while* (flightStream.next()) { >> *if* (!flightStream.hasRoot()) { *break*; } >> >> listener.start(flightStream.getRoot(), >> flightStream.getDictionaryProvider()); >> listener.putNext(); >> } >> >> listener.completed(); >> >> } >> >> >> We are running into issues understanding if this is valid usage? I have >> looked at the OutBoundStreamListenerImpl.java file and it looks like >> calling start() on the listener causes it to resend some schema messages. >> We are trying to understand how to handle the case where >> flightStream.getRoot() returns a different VectorSchemaRoot than the >> previous call. >> >> For more context we have also tried >> >> public *void* getStream(CallContext context, ServerStreamListener listener) { >> FlightStream flightStream = *this*.client.getStream(ticket); >> listener.start(flightStream.getRoot(), >> flightStream.getDictionaryProvider()); >> *while* (flightStream.next()) { >> *if* (!flightStream.hasRoot()) { *break*; } >> >> listener.putNext(); >> } >> listener.completed();} >> >> But ran into issues with the connection not closing, we believe this to >> be due to the VectorSchemaRoot changing on flightStream.next() calls. We >> believe this is a issue because we are sharing the root with both the >> FlightStream and ServerStreamListener. >> https://github.com/dremio-hub/arrow-flight-client-examples is the client >> we are using to test this end to end. >> >> Please let me know if you can provide any clarity, I would be happy to >> update the documentation afterwards. >> >> Sincerely, >> Alex McRae >> [email protected] >> >> >> >> >> -- >> *James Duong* >> Lead Software Developer >> Bit Quill Technologies Inc. >> Direct: +1.604.562.6082 | [email protected] >> https://www.bitquilltech.com >> >> >> This email message is for the sole use of the intended recipient(s) and >> may contain confidential and privileged information. Any unauthorized >> review, use, disclosure, or distribution is prohibited. If you are not the >> intended recipient, please contact the sender by reply email and destroy >> all copies of the original message. Thank you. >> >> >> >> >> >> >> >> -- >> *James Duong* >> Lead Software Developer >> Bit Quill Technologies Inc. >> Direct: +1.604.562.6082 | [email protected] >> https://www.bitquilltech.com >> >> >> This email message is for the sole use of the intended recipient(s) and >> may contain confidential and privileged information. Any unauthorized >> review, use, disclosure, or distribution is prohibited. If you are not the >> intended recipient, please contact the sender by reply email and destroy >> all copies of the original message. Thank you. >> >> >> >> -- >> *James Duong* >> Lead Software Developer >> Bit Quill Technologies Inc. >> Direct: +1.604.562.6082 | [email protected] >> https://www.bitquilltech.com >> >> >> This email message is for the sole use of the intended recipient(s) and >> may contain confidential and privileged information. Any unauthorized >> review, use, disclosure, or distribution is prohibited. If you are not the >> intended recipient, please contact the sender by reply email and destroy >> all copies of the original message. Thank you. >> >> >>
