gortiz opened a new pull request, #17613:
URL: https://github.com/apache/pinot/pull/17613

   In SSE, brokers execute the reduce phase, which merges data from different 
servers. The key point of MSE is to have several map-reduce phases, and each 
reduce is executed on one or more servers. This means that in MSE, the broker 
can either receive data from a single server (in which case the data is ready 
to be sent) or from multiple servers; in the latter case, the final result is 
guaranteed to be the concatenation of the data from each stage 1 server.
   
   For historical reasons, the broker reads all data sent from stage 1, 
concatenates it into a Java heap structure, and then serializes it to the 
format used to be sent to the customer (usually JSON, but recently we also 
added GRPC). 
   
   This means we need to allocate and retain several copies of the data the 
customer requests for a while. That may be an issue for queries that return a 
large amount of data. But there is no technical reason to implement brokers 
this way. Given the properties described in the first paragraph, brokers could 
just pipeline the received data to the customer.
   
   This approach reduces the memory cost of running MSE queries in brokers from 
linear in the number of rows returned to a very low constant. This should also 
reduce broker latency for large queries. As a side effect, in case of errors, 
the behavior would be slightly different. Currently, we either return result 
rows or an error (without rows). In cases where a server finishes successfully 
while another fails, the result from a streaming broker may include some rows 
and an error.
   
   As a picture speaks a thousand words, here is a screenshot of a request in 
master:
   <img width="1710" height="832" alt="image" 
src="https://github.com/user-attachments/assets/fb39d88c-ceaf-4138-aac1-010c823ef13d";
 />
   
   And here is the same request in this branch. The query runs a bit faster, 
but as shown above, the performance may not be that different; the important 
part is that the customer starts receiving the requests earlier.
   <img width="1710" height="832" alt="image" 
src="https://github.com/user-attachments/assets/f6dad376-5fc3-4be3-bed2-17d9d6624603";
 />
   
   This PR also includes a benchmark which executes a simple `select *` query. 
The difference in raw numbers is not that high because we still need to pay the 
server-side cost, but the MB/op (`gc.alloc.rate.norm`) metric, especially the 
total gc time, is significantly better. 
   
   ```
   Benchmark                                                (_segments)  Mode  
Cnt           Score      Error   Units
   BenchmarkBrokerSerializer.projectAll                              10  avgt   
 5         378.497 ±  130.619   ms/op
   BenchmarkBrokerSerializer.projectAll:gc.alloc.rate                10  avgt   
 5        5918.328 ± 5726.710  MB/sec
   BenchmarkBrokerSerializer.projectAll:gc.alloc.rate.norm           10  avgt   
 5         2394.66 ±    11.81   MB/op
   BenchmarkBrokerSerializer.projectAll:gc.count                     10  avgt   
 5          36.000             counts
   BenchmarkBrokerSerializer.projectAll:gc.time                      10  avgt   
 5         790.000                 ms
   
   Benchmark                                                (_segments)  Mode  
Cnt           Score      Error   Units
   BenchmarkBrokerSerializer.projectAll                              10  avgt   
 5         382.165 ±    7.528   ms/op
   BenchmarkBrokerSerializer.projectAll:gc.alloc.rate                10  avgt   
 5        6066.623 ± 4853.627  MB/sec
   BenchmarkBrokerSerializer.projectAll:gc.alloc.rate.norm           10  avgt   
 5         2540.97 ±     11.2   MB/op
   BenchmarkBrokerSerializer.projectAll:gc.count                     10  avgt   
 5          71.000             counts
   BenchmarkBrokerSerializer.projectAll:gc.time                      10  avgt   
 5         917.000                 ms
   ```
   
   As reviewers will see, the number of surface changes is pretty large. I was 
looking for a way to support the old and new modes in the same process, so we 
can disable this option by default to reduce the risks, but I think that would 
require even more changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to