xinyiZzz opened a new issue, #25514:
URL: https://github.com/apache/doris/issues/25514

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Description
   
   # 1. Motivation
   
   I am trying to implement Arrow Flight SQL Server in Apache Doris to support 
ADBC. The goal is to speed up the return of large query results from Doris to 
Python. Now Python uses mysql-client to load large batches of data from Doris 
very slowly. The scenario is data science/ Machine learning.
   
   # 2. Implementation method
   
   Doris is a column-stored database. It is very expensive to convert the 
<column-stored data block> in Doris into <column-stored data block> into 
<column-stored python pandas> and then convert it back to <column-stored python 
pandas>. The time consuming is mainly in the process of transposing the data 
rows and columns twice and serialization.
   
   Borrowing a Dremio diagram, it is similar to accelerating python reading 
Doris based on Arrow Flight SQL. Change the above process to convert the 
<column-stored data Block> in Doris into <column-stored Arrow RecordBatch> and 
then to <column-stored python pandas> to speed up data conversion and 
transmission.
   
   # 3. Introduction to Arrow Flight SQL
   
   1. Arrow, a column storage data format
   
   2. Arrow Flight, an RPC framework for transmitting Arrow data format
   
   3. Arrow Flight SQL, a client-server protocol developed by the Apache Arrow 
community, is used for clients to use the Arrow format to interact with 
databases that implement the Flight RPC framework. Compared with JDBC/ODBC, it 
avoids serialization and twice data rows and columns. Transpose. And supports 
parallel reading and writing.
   
   4. ADBC, a client API that supports different languages ​​to access the 
database. The database needs to implement Arrow Flight Sql server, similar to 
JDBC/ODBC.
   
   # 4. Purpose
   
   1. Speed up Python Pandas to load all data field types of Doris.
   
   2. Supports SQL query, Set Session Veriable, and DDL, but does not support 
data import yet.
   
   3. In the future, it may be possible to replace the interface for reading 
Doris by other systems such as Spark.
   
   # 5. Design
   ## 5.1 Outline design
   [picture]
   1) ADBC Client sends a query request to Doris FE and completes the 
authentication on the first request.
   
   2) FE parses the query plan and sends the Fragment to be executed to BE.
   
   3) After BE completes the prepare and open of the Fragment, it returns the 
Schema of the query result in Arrow format to FE, starts executing the query, 
and puts the query results into a queue.
   
   4) FE sends the QueryID, the Schema of the query result, and the BE address 
(Endpoints) where the query result is located back to the ADBC Client.
   
   5) ADBC Client requests BE to pull the query results of the specified 
QueryID.
   
   6) BE returns the query results in Arrow format in the queue to the ADBC 
Client, and the ADBC Client completes after verifying the Schema of the results.
   
   ## 5.2 Detailed design
   
   Arrow version: 13.0.0
   Take the ADBC Low-Level API execution process as an example:
   
   ### 5.2.1 ADBC Client
   
   **1.1** db = 
adbc_driver_flightsql.connect(uri="grpc://ip:port?user=&password=")
   Create a Database connector that can maintain multiple shared Connections at 
the same time. Parameters: Arrow Flight Server IP, port, username, password
   
   **1.2** conn = adbc_driver_manager.AdbcConnection(db)
   Creating a Database link will trigger authentication and obtain 
FlightSqlInfo.
   
   1. Auth
   The authentication operation will be triggered when Arrow Flight Server is 
requested for the first time.
   The return value is a Bearer Token. Each subsequent request to Arrow Flight 
Server will bring this Token.
   
   2. getFlightInfoSqlInfo
   Request Arrow Flight Server to return SQL Info, including the SQL syntax 
supported by the database, etc. The return value is the schema and endpoint of 
SQL Info. SQL Info is also data in arrow format, and the endpoint is still the 
current doris fe flight server.
   
   All data interacted in arrow flight are arrows. Usually before obtaining an 
arrow data, the first request will obtain its endpoint and schema and 
encapsulate it in a FlightInfo. Then the endpoint will be requested again to 
obtain the arrow data and verify it. schema
   
   3. getStreamSqlInfo
   Request the endpoint to obtain SQL Info. The result is wrapped in 
ArrowArrayStream and associated with a ServerStreamListener.
   
   **1.3** stmt = adbc_driver_manager.AdbcStatement(conn)
   It is used to maintain the status of the query. It can be a one-time query 
or a prepare statement, which can be used repeatedly, but the previous query 
results will be invalid.
   
   **1.4** stmt.set_sql_query("select * from tpch.hdf5 limit 10;")
   
   **1.5** stream, _ = stmt.execute_query()
   Executing Query returns a RecordBatchReader, wrapped in a RecordBatchStream.
   
   1. getFlightInfoStatement
   Returns the Endpoints and Schema where the query results are located, which 
is the Metadata of the Stream.
   
   2. getStreamStatement
   Returns a RecordBatchReader for reading query results.
   
   **1.6** reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
   Created a Reader using Stream.
   
   **1.7** arrow_data = reader.read_all()
   read_all() will loop to call RecordBatchReader.ReadNext() to obtain the 
RecordBatch of query results.
   
   Corresponding code example:
   ```
   import adbc_driver_flightsql
   import adbc_driver_manager
   
   db = adbc_driver_flightsql.connect(uri="grpc://127.0.0.1:8040", db_kwargs={
               adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
               adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
           })
   conn = adbc_driver_manager.AdbcConnection(db)
   stmt = adbc_driver_manager.AdbcStatement(conn)
   stmt.set_sql_query("select * from tbl1 limit 1000000;")
   stream, rows = stmt.execute_query()
   reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
   arrow_data = reader.read_all()
   ```
   
   ### 5.2.2 Doris FE
   
   **2.1** Authentication
   Implement arrow.flight.auth2 related interfaces to respond to authentication 
when the ADBC client connects for the first time. Extract the username and 
password in the request header and perform authentication. After generating a 
130-bit Token, associate the Token with the user's permission information and 
save it in a cache. The cache size and Token expiration time can be adjusted in 
Config. Finally, the Token is returned to the ADBC client.
   [picture]
   
   **2.2** getFlightInfoSqlInfo
   In response to the arrow flight sql request, SQL Info is returned and two 
methods, FlightSqlProducer.getFlightInfoSqlInfo() and 
FlightSqlProducer.getStreamSqlInfo, are implemented.
   
   When Arrow Flight Server is initialized, it will create a FlightSqlProducer 
that responds to ADBC requests. When FlightSqlProducer is initialized, it will 
bind SQL Info, including the version of Arrow, whether it supports reading and 
writing, whether it supports DDL statements such as creating tables and 
modifying schema, and supported function lists and other SQL syntax. etc.
   
   **2.3** getFlightInfoStatement
   Execute Query in response to the arrow flight sql request and return the 
Endpoints and Schema of the query results, implementing the 
FlightSqlProducer.getFlightInfoStatement method.
   
   1. Initialize ConnectContext. The first time ADBC Client makes an Execute 
Query request, it will initialize ConnectContext, which is a Session that 
stores information related to query execution, including user permissions, 
Session variables, etc.
   
   2. Initialize the executor FlightStatementExecutor. Saves Query, QueryID, 
connectContext, and resultServerInfo.
   
   3. Execute Query. Initialize QueryID and StmtExecutor, then 
executeArrowFlightQuery to generate the query plan, initialize and execute the 
Coordinator, and send the Fragment to the specified BE.
   
   4. Get the Arrow Result Set Schema. Request the Arrow Flight Server of the 
BE where the Result Sink Node in the query plan is located. The latter will 
generate the Schema of the query result after the Fragment completes Prepare 
and Open.
   
   5. Use Query and QueryID to initialize Ticket, use the Arrow Flight Server 
address of the BE where the Result Sink Node in the query plan is located, that 
is, the Server address where the query result Arrow Result Set is located, and 
the Ticket to initialize FlightEndpoint, and finally use Arrow Result Set 
Schema and Endpoints to initialize FlightInfo Then send it back to ADBC Client.
   [picture]
   
   ### 5.2.3 Doris BE
   
   **3.1** Execute Fragment
   Execute the Fragment and return the Arrow Result Set Schema. The overall 
execution process is the same as before. The difference is that the type of 
ResultSinkNode in the Fragment is no longer MYSQL_PROTOCAL, but 
ARROW_FLIGHT_PROTOCAL. After the Prepare and Open are completed, the Arrow 
Schema of the query result will be put into a Map. Wait for FE to obtain and 
initialize ArrowFlightResultWriter.
   
   After the subsequent query results arrive at the ResultSink, use 
ArrowFlightResultWriter::append_block to convert the data block into a 
RecordBatch in Arrow format, and then put it into a separate queue 
BufferControlBlock, waiting for the ADBC Client to pull it.
   
   **3.2** GetStatement
   After receiving the Endpoints sent back by Doris FE, the ADBC Client will 
request the Arrow Flight Server address corresponding to the Endpoints located 
in Doris BE. After receiving the ADBC Client request, Doris BE will first 
Decode the Ticket and then obtain the SQL and QueryID, and then use the QueryID 
Find the Arrow Schema of the previously saved query result and initialize a 
RecordBatchReader to return it, which is used by the ADBC Client to 
subsequently pull data and implement the FlightSqlServerBase::DoGetStatement() 
method.
   
   In addition, when the ADBC Client requests Doris BE's Arrow Flight Server 
for the first time, the Header will also contain the Bearer Token, but the 
HeaderAuthServerMiddleware and BearerAuthServerMiddleware used when the BE 
Arrow Flight Server is initialized are both NoOp, that is, no verification will 
be done, so currently BE Arrow Flight Server's permission verification of 
requests is based on QueryID, that is, ADBC Client is allowed to read data as 
long as the QueryID is correct.
   [picture]
   
   **3.3** ArrowFlightBatchReader::ReadNext
   ADBC Client will cyclically call the ReadNext method of the previously 
returned RecordBatchReader to pull data, and BE Arrow Flight Server will use 
the QueryID in the request to pull the RecordBatch in Arrow format from the 
BufferControlBlock and return it.
   
   # 6. How to use
   ## Python
   ```
   #!/usr/bin/env python
   # -*- coding: utf-8 -*-
   
   sql = "select * from tpch.hdf5 limit 1000000;"
   
   # PEP 249 (DB-API 2.0) API wrapper for the ADBC Driver Manager.
   def dbapi_adbc_execute_fetchallarrow():
       import adbc_driver_manager
       import adbc_driver_flightsql.dbapi as flight_sql
       import pandas
       from datetime import datetime
   
       conn = flight_sql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={
                   adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
                   adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
               })
       cursor = conn.cursor()
       start_time = datetime.now()
       cursor.execute(sql)
       arrow_data = cursor.fetchallarrow()
       dataframe = arrow_data.to_pandas()
       print("\n##################\n dbapi_adbc_execute_fetchallarrow" + ", 
cost:" + str(datetime.now() - start_time) + "bytes:" + str(arrow_data.nbytes) + 
", len(arrow_data):" + str(len(arrow_data)))
       print(dataframe.info(memory_usage='deep'))
       print(dataframe)
   
   # ADBC reads data into pandas dataframe, which is faster than fetchallarrow 
first and then to_pandas.
   def dbapi_adbc_execute_fetch_df():
       import adbc_driver_manager
       import adbc_driver_flightsql.dbapi as flight_sql
       import pandas
       from datetime import datetime
   
       conn = flight_sql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={
                   adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
                   adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
               })
       cursor = conn.cursor()
       start_time = datetime.now()
       cursor.execute(sql)
       dataframe = cursor.fetch_df()    
       print("\n##################\n dbapi_adbc_execute_fetch_df" + ", cost:" + 
str(datetime.now() - start_time))
       print(dataframe.info(memory_usage='deep'))
       print(dataframe)
   
   # Can read multiple partitions in parallel.
   def dbapi_adbc_execute_partitions():
       import adbc_driver_flightsql.dbapi as flight_sql
       import adbc_driver_manager
       from datetime import datetime
       import pandas
   
       conn = flight_sql.connect(uri="grpc://10.16.10.8:10478", db_kwargs={
                   adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
                   adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
               })
       cursor = conn.cursor()
       start_time = datetime.now()
       partitions, schema = cursor.adbc_execute_partitions(sql)
       cursor.adbc_read_partition(partitions[0])
       arrow_data = cursor.fetchallarrow()
       dataframe = arrow_data.to_pandas()
       print("\n##################\n dbapi_adbc_execute_partitions" + ", cost:" 
+ str(datetime.now() - start_time) + ", len(partitions):" + 
str(len(partitions)))
       print(dataframe.info(memory_usage='deep'))
       print(dataframe)
   
   # ADBC Low-level api is root module, provides a fairly direct, 1:1 mapping 
to the C API definitions in Python. 
   # For a higher-level interface, use adbc_driver_manager.dbapi. (This 
requires PyArrow.)
   def low_level_api_execute_query():
       import adbc_driver_flightsql
       import adbc_driver_manager
       from datetime import datetime
       import pyarrow
   
       db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478", 
db_kwargs={
                   adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
                   adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
               })
       # db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478", 
db_kwargs={
       #             
adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer 
<token>",
       #             
adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
       #         })
       # db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478")
       conn = adbc_driver_manager.AdbcConnection(db)
       # print("conn.adbc_get_info()[vendor_name]" + 
conn.adbc_get_info()["vendor_name"])
       stmt = adbc_driver_manager.AdbcStatement(conn)
       stmt.set_sql_query(sql)
       start_time = datetime.now()
       stream, rows = stmt.execute_query()
       reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
       arrow_data = reader.read_all()
       dataframe = arrow_data.to_pandas()
       print("\n##################\n low_level_api_execute_query" + ", cost:" + 
str(datetime.now() - start_time) + ",stream.address:" + str(stream.address) + 
", rows:" + str(rows) + "bytes:" + str(arrow_data.nbytes) + ", 
len(arrow_data):" + str(len(arrow_data)))
       print(dataframe.info(memory_usage='deep'))
       print(dataframe)
   
   # Can read multiple partitions in parallel.
   def low_level_api_execute_partitions():
       import adbc_driver_flightsql
       import adbc_driver_manager
       from datetime import datetime
       import pyarrow
   
       db = adbc_driver_flightsql.connect(uri="grpc://10.16.10.8:10478", 
db_kwargs={
                   adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
                   adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
               })
       conn = adbc_driver_manager.AdbcConnection(db)
       stmt = adbc_driver_manager.AdbcStatement(conn)
       stmt.set_sql_query(sql)
       start_time = datetime.now()
       streams = stmt.execute_partitions()
       for s in streams[0]:
           stream = conn.read_partition(s)
           reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
           arrow_data = reader.read_all()
           dataframe = arrow_data.to_pandas()
       print("\n##################\n low_level_api_execute_partitions" + ", 
cost:" + str(datetime.now() - start_time) + "streams.size:" + str(len(streams)) 
+ ", "  + str(len(streams[0])) + ", " + str(streams[2]))
   
   # test bulk ingest
   def dbapi_bulk_ingest():
       import adbc_driver_flightsql.dbapi as flight_sql
       conn = 
flight_sql.connect(uri="grpc://10.16.10.8:10478?user=root&password=")
       cursor = conn.cursor()
       table = pyarrow.table([[1, 2], ["a", None]], names=["ints", "strs"])
       cursor.adbc_ingest("sample", table)
       cursor.execute("SELECT COUNT(DISTINCT ints) FROM sample")
       cursor.fetchall()
   ```
   
   # 7. Progress and TODO
   
   1. [enhancement](thirdparty) upgrade thirdparty libs - again 
https://github.com/apache/doris/pull/23414
   
   2. [feature-wip] (arrow-flight) (step1) BE support Arrow Flight server, read 
data only https://github.com/apache/doris/pull/23765
   
   3. [feature-wip](arrow-flight)(step2) FE support Arrow Flight server 
https://github.com/apache/doris/pull/24314
   
   4. [feature-wip](arrow-flight)(step3) Support authentication and user 
session https://github.com/apache/doris/pull/24772
   
   5. TODO: (step4) Support session veriable
   
   6. TODO: (step5) Support DDL
   
   7. TODO: (step6) Add regression-test and UT
   
   8. TODO: More performance tests……
   
   ### Use case
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to