xinyiZzz opened a new issue, #25514: URL: https://github.com/apache/doris/issues/25514
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Description # 1. Motivation I am trying to implement Arrow Flight SQL Server in Apache Doris to support ADBC. The goal is to speed up the return of large query results from Doris to Python. Now Python uses mysql-client to load large batches of data from Doris very slowly. The scenario is data science/ Machine learning. # 2. Implementation method Doris is a column-stored database. It is very expensive to convert the <column-stored data block> in Doris into <column-stored data block> into <column-stored python pandas> and then convert it back to <column-stored python pandas>. The time consuming is mainly in the process of transposing the data rows and columns twice and serialization. Borrowing a Dremio diagram, it is similar to accelerating python reading Doris based on Arrow Flight SQL. Change the above process to convert the <column-stored data Block> in Doris into <column-stored Arrow RecordBatch> and then to <column-stored python pandas> to speed up data conversion and transmission. # 3. Introduction to Arrow Flight SQL 1. Arrow, a column storage data format 2. Arrow Flight, an RPC framework for transmitting Arrow data format 3. Arrow Flight SQL, a client-server protocol developed by the Apache Arrow community, is used for clients to use the Arrow format to interact with databases that implement the Flight RPC framework. Compared with JDBC/ODBC, it avoids serialization and twice data rows and columns. Transpose. And supports parallel reading and writing. 4. ADBC, a client API that supports different languages to access the database. The database needs to implement Arrow Flight Sql server, similar to JDBC/ODBC. # 4. Purpose 1. Speed up Python Pandas to load all data field types of Doris. 2. Supports SQL query, Set Session Veriable, and DDL, but does not support data import yet. 3. In the future, it may be possible to replace the interface for reading Doris by other systems such as Spark. # 5. Design ## 5.1 Outline design [picture] 1) ADBC Client sends a query request to Doris FE and completes the authentication on the first request. 2) FE parses the query plan and sends the Fragment to be executed to BE. 3) After BE completes the prepare and open of the Fragment, it returns the Schema of the query result in Arrow format to FE, starts executing the query, and puts the query results into a queue. 4) FE sends the QueryID, the Schema of the query result, and the BE address (Endpoints) where the query result is located back to the ADBC Client. 5) ADBC Client requests BE to pull the query results of the specified QueryID. 6) BE returns the query results in Arrow format in the queue to the ADBC Client, and the ADBC Client completes after verifying the Schema of the results. ## 5.2 Detailed design Arrow version: 13.0.0 Take the ADBC Low-Level API execution process as an example: ### 5.2.1 ADBC Client **1.1** db = adbc_driver_flightsql.connect(uri="grpc://ip:port?user=&password=") Create a Database connector that can maintain multiple shared Connections at the same time. Parameters: Arrow Flight Server IP, port, username, password **1.2** conn = adbc_driver_manager.AdbcConnection(db) Creating a Database link will trigger authentication and obtain FlightSqlInfo. 1. Auth The authentication operation will be triggered when Arrow Flight Server is requested for the first time. The return value is a Bearer Token. Each subsequent request to Arrow Flight Server will bring this Token. 2. getFlightInfoSqlInfo Request Arrow Flight Server to return SQL Info, including the SQL syntax supported by the database, etc. The return value is the schema and endpoint of SQL Info. SQL Info is also data in arrow format, and the endpoint is still the current doris fe flight server. All data interacted in arrow flight are arrows. Usually before obtaining an arrow data, the first request will obtain its endpoint and schema and encapsulate it in a FlightInfo. Then the endpoint will be requested again to obtain the arrow data and verify it. schema 3. getStreamSqlInfo Request the endpoint to obtain SQL Info. The result is wrapped in ArrowArrayStream and associated with a ServerStreamListener. **1.3** stmt = adbc_driver_manager.AdbcStatement(conn) It is used to maintain the status of the query. It can be a one-time query or a prepare statement, which can be used repeatedly, but the previous query results will be invalid. **1.4** stmt.set_sql_query("select * from tpch.hdf5 limit 10;") **1.5** stream, _ = stmt.execute_query() Executing Query returns a RecordBatchReader, wrapped in a RecordBatchStream. 1. getFlightInfoStatement Returns the Endpoints and Schema where the query results are located, which is the Metadata of the Stream. 2. getStreamStatement Returns a RecordBatchReader for reading query results. **1.6** reader = pyarrow.RecordBatchReader._import_from_c(stream.address) Created a Reader using Stream. **1.7** arrow_data = reader.read_all() read_all() will loop to call RecordBatchReader.ReadNext() to obtain the RecordBatch of query results. Corresponding code example: ``` import adbc_driver_flightsql import adbc_driver_manager db = adbc_driver_flightsql.connect(uri="grpc://127.0.0.1:8040", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) conn = adbc_driver_manager.AdbcConnection(db) stmt = adbc_driver_manager.AdbcStatement(conn) stmt.set_sql_query("select * from tbl1 limit 1000000;") stream, rows = stmt.execute_query() reader = pyarrow.RecordBatchReader._import_from_c(stream.address) arrow_data = reader.read_all() ``` ### 5.2.2 Doris FE **2.1** Authentication Implement arrow.flight.auth2 related interfaces to respond to authentication when the ADBC client connects for the first time. Extract the username and password in the request header and perform authentication. After generating a 130-bit Token, associate the Token with the user's permission information and save it in a cache. The cache size and Token expiration time can be adjusted in Config. Finally, the Token is returned to the ADBC client. [picture] **2.2** getFlightInfoSqlInfo In response to the arrow flight sql request, SQL Info is returned and two methods, FlightSqlProducer.getFlightInfoSqlInfo() and FlightSqlProducer.getStreamSqlInfo, are implemented. When Arrow Flight Server is initialized, it will create a FlightSqlProducer that responds to ADBC requests. When FlightSqlProducer is initialized, it will bind SQL Info, including the version of Arrow, whether it supports reading and writing, whether it supports DDL statements such as creating tables and modifying schema, and supported function lists and other SQL syntax. etc. **2.3** getFlightInfoStatement Execute Query in response to the arrow flight sql request and return the Endpoints and Schema of the query results, implementing the FlightSqlProducer.getFlightInfoStatement method. 1. Initialize ConnectContext. The first time ADBC Client makes an Execute Query request, it will initialize ConnectContext, which is a Session that stores information related to query execution, including user permissions, Session variables, etc. 2. Initialize the executor FlightStatementExecutor. Saves Query, QueryID, connectContext, and resultServerInfo. 3. Execute Query. Initialize QueryID and StmtExecutor, then executeArrowFlightQuery to generate the query plan, initialize and execute the Coordinator, and send the Fragment to the specified BE. 4. Get the Arrow Result Set Schema. Request the Arrow Flight Server of the BE where the Result Sink Node in the query plan is located. The latter will generate the Schema of the query result after the Fragment completes Prepare and Open. 5. Use Query and QueryID to initialize Ticket, use the Arrow Flight Server address of the BE where the Result Sink Node in the query plan is located, that is, the Server address where the query result Arrow Result Set is located, and the Ticket to initialize FlightEndpoint, and finally use Arrow Result Set Schema and Endpoints to initialize FlightInfo Then send it back to ADBC Client. [picture] ### 5.2.3 Doris BE **3.1** Execute Fragment Execute the Fragment and return the Arrow Result Set Schema. The overall execution process is the same as before. The difference is that the type of ResultSinkNode in the Fragment is no longer MYSQL_PROTOCAL, but ARROW_FLIGHT_PROTOCAL. After the Prepare and Open are completed, the Arrow Schema of the query result will be put into a Map. Wait for FE to obtain and initialize ArrowFlightResultWriter. After the subsequent query results arrive at the ResultSink, use ArrowFlightResultWriter::append_block to convert the data block into a RecordBatch in Arrow format, and then put it into a separate queue BufferControlBlock, waiting for the ADBC Client to pull it. **3.2** GetStatement After receiving the Endpoints sent back by Doris FE, the ADBC Client will request the Arrow Flight Server address corresponding to the Endpoints located in Doris BE. After receiving the ADBC Client request, Doris BE will first Decode the Ticket and then obtain the SQL and QueryID, and then use the QueryID Find the Arrow Schema of the previously saved query result and initialize a RecordBatchReader to return it, which is used by the ADBC Client to subsequently pull data and implement the FlightSqlServerBase::DoGetStatement() method. In addition, when the ADBC Client requests Doris BE's Arrow Flight Server for the first time, the Header will also contain the Bearer Token, but the HeaderAuthServerMiddleware and BearerAuthServerMiddleware used when the BE Arrow Flight Server is initialized are both NoOp, that is, no verification will be done, so currently BE Arrow Flight Server's permission verification of requests is based on QueryID, that is, ADBC Client is allowed to read data as long as the QueryID is correct. [picture] **3.3** ArrowFlightBatchReader::ReadNext ADBC Client will cyclically call the ReadNext method of the previously returned RecordBatchReader to pull data, and BE Arrow Flight Server will use the QueryID in the request to pull the RecordBatch in Arrow format from the BufferControlBlock and return it. # 6. How to use ## Python ``` #!/usr/bin/env python # -*- coding: utf-8 -*- sql = "select * from tpch.hdf5 limit 1000000;" # PEP 249 (DB-API 2.0) API wrapper for the ADBC Driver Manager. def dbapi_adbc_execute_fetchallarrow(): import adbc_driver_manager import adbc_driver_flightsql.dbapi as flight_sql import pandas from datetime import datetime conn = flight_sql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) cursor = conn.cursor() start_time = datetime.now() cursor.execute(sql) arrow_data = cursor.fetchallarrow() dataframe = arrow_data.to_pandas() print("\n##################\n dbapi_adbc_execute_fetchallarrow" + ", cost:" + str(datetime.now() - start_time) + "bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data))) print(dataframe.info(memory_usage='deep')) print(dataframe) # ADBC reads data into pandas dataframe, which is faster than fetchallarrow first and then to_pandas. def dbapi_adbc_execute_fetch_df(): import adbc_driver_manager import adbc_driver_flightsql.dbapi as flight_sql import pandas from datetime import datetime conn = flight_sql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) cursor = conn.cursor() start_time = datetime.now() cursor.execute(sql) dataframe = cursor.fetch_df() print("\n##################\n dbapi_adbc_execute_fetch_df" + ", cost:" + str(datetime.now() - start_time)) print(dataframe.info(memory_usage='deep')) print(dataframe) # Can read multiple partitions in parallel. def dbapi_adbc_execute_partitions(): import adbc_driver_flightsql.dbapi as flight_sql import adbc_driver_manager from datetime import datetime import pandas conn = flight_sql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) cursor = conn.cursor() start_time = datetime.now() partitions, schema = cursor.adbc_execute_partitions(sql) cursor.adbc_read_partition(partitions[0]) arrow_data = cursor.fetchallarrow() dataframe = arrow_data.to_pandas() print("\n##################\n dbapi_adbc_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + ", len(partitions):" + str(len(partitions))) print(dataframe.info(memory_usage='deep')) print(dataframe) # ADBC Low-level api is root module, provides a fairly direct, 1:1 mapping to the C API definitions in Python. # For a higher-level interface, use adbc_driver_manager.dbapi. (This requires PyArrow.) def low_level_api_execute_query(): import adbc_driver_flightsql import adbc_driver_manager from datetime import datetime import pyarrow db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) # db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={ # adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>", # adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY.value: "true", # }) # db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478") conn = adbc_driver_manager.AdbcConnection(db) # print("conn.adbc_get_info()[vendor_name]" + conn.adbc_get_info()["vendor_name"]) stmt = adbc_driver_manager.AdbcStatement(conn) stmt.set_sql_query(sql) start_time = datetime.now() stream, rows = stmt.execute_query() reader = pyarrow.RecordBatchReader._import_from_c(stream.address) arrow_data = reader.read_all() dataframe = arrow_data.to_pandas() print("\n##################\n low_level_api_execute_query" + ", cost:" + str(datetime.now() - start_time) + ",stream.address:" + str(stream.address) + ", rows:" + str(rows) + "bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data))) print(dataframe.info(memory_usage='deep')) print(dataframe) # Can read multiple partitions in parallel. def low_level_api_execute_partitions(): import adbc_driver_flightsql import adbc_driver_manager from datetime import datetime import pyarrow db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) conn = adbc_driver_manager.AdbcConnection(db) stmt = adbc_driver_manager.AdbcStatement(conn) stmt.set_sql_query(sql) start_time = datetime.now() streams = stmt.execute_partitions() for s in streams[0]: stream = conn.read_partition(s) reader = pyarrow.RecordBatchReader._import_from_c(stream.address) arrow_data = reader.read_all() dataframe = arrow_data.to_pandas() print("\n##################\n low_level_api_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + "streams.size:" + str(len(streams)) + ", " + str(len(streams[0])) + ", " + str(streams[2])) # test bulk ingest def dbapi_bulk_ingest(): import adbc_driver_flightsql.dbapi as flight_sql conn = flight_sql.connect(uri="grpc://10.16.10.8:10478?user=root&password=") cursor = conn.cursor() table = pyarrow.table([[1, 2], ["a", None]], names=["ints", "strs"]) cursor.adbc_ingest("sample", table) cursor.execute("SELECT COUNT(DISTINCT ints) FROM sample") cursor.fetchall() ``` # 7. Progress and TODO 1. [enhancement](thirdparty) upgrade thirdparty libs - again https://github.com/apache/doris/pull/23414 2. [feature-wip] (arrow-flight) (step1) BE support Arrow Flight server, read data only https://github.com/apache/doris/pull/23765 3. [feature-wip](arrow-flight)(step2) FE support Arrow Flight server https://github.com/apache/doris/pull/24314 4. [feature-wip](arrow-flight)(step3) Support authentication and user session https://github.com/apache/doris/pull/24772 5. TODO: (step4) Support session veriable 6. TODO: (step5) Support DDL 7. TODO: (step6) Add regression-test and UT 8. TODO: More performance tests…… ### Use case _No response_ ### Related issues _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
