gortiz opened a new pull request, #17613: URL: https://github.com/apache/pinot/pull/17613
In SSE, brokers execute the reduce phase, which merges data from different servers. The key point of MSE is to have several map-reduce phases, and each reduce is executed on one or more servers. This means that in MSE, the broker can either receive data from a single server (in which case the data is ready to be sent) or from multiple servers; in the latter case, the final result is guaranteed to be the concatenation of the data from each stage 1 server. For historical reasons, the broker reads all data sent from stage 1, concatenates it into a Java heap structure, and then serializes it to the format used to be sent to the customer (usually JSON, but recently we also added GRPC). This means we need to allocate and retain several copies of the data the customer requests for a while. That may be an issue for queries that return a large amount of data. But there is no technical reason to implement brokers this way. Given the properties described in the first paragraph, brokers could just pipeline the received data to the customer. This approach reduces the memory cost of running MSE queries in brokers from linear in the number of rows returned to a very low constant. This should also reduce broker latency for large queries. As a side effect, in case of errors, the behavior would be slightly different. Currently, we either return result rows or an error (without rows). In cases where a server finishes successfully while another fails, the result from a streaming broker may include some rows and an error. As a picture speaks a thousand words, here is a screenshot of a request in master: <img width="1710" height="832" alt="image" src="https://github.com/user-attachments/assets/fb39d88c-ceaf-4138-aac1-010c823ef13d" /> And here is the same request in this branch. The query runs a bit faster, but as shown above, the performance may not be that different; the important part is that the customer starts receiving the requests earlier. <img width="1710" height="832" alt="image" src="https://github.com/user-attachments/assets/f6dad376-5fc3-4be3-bed2-17d9d6624603" /> This PR also includes a benchmark which executes a simple `select *` query. The difference in raw numbers is not that high because we still need to pay the server-side cost, but the MB/op (`gc.alloc.rate.norm`) metric, especially the total gc time, is significantly better. ``` Benchmark (_segments) Mode Cnt Score Error Units BenchmarkBrokerSerializer.projectAll 10 avgt 5 378.497 ± 130.619 ms/op BenchmarkBrokerSerializer.projectAll:gc.alloc.rate 10 avgt 5 5918.328 ± 5726.710 MB/sec BenchmarkBrokerSerializer.projectAll:gc.alloc.rate.norm 10 avgt 5 2394.66 ± 11.81 MB/op BenchmarkBrokerSerializer.projectAll:gc.count 10 avgt 5 36.000 counts BenchmarkBrokerSerializer.projectAll:gc.time 10 avgt 5 790.000 ms Benchmark (_segments) Mode Cnt Score Error Units BenchmarkBrokerSerializer.projectAll 10 avgt 5 382.165 ± 7.528 ms/op BenchmarkBrokerSerializer.projectAll:gc.alloc.rate 10 avgt 5 6066.623 ± 4853.627 MB/sec BenchmarkBrokerSerializer.projectAll:gc.alloc.rate.norm 10 avgt 5 2540.97 ± 11.2 MB/op BenchmarkBrokerSerializer.projectAll:gc.count 10 avgt 5 71.000 counts BenchmarkBrokerSerializer.projectAll:gc.time 10 avgt 5 917.000 ms ``` As reviewers will see, the number of surface changes is pretty large. I was looking for a way to support the old and new modes in the same process, so we can disable this option by default to reduce the risks, but I think that would require even more changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
