gnehil opened a new pull request, #140:
URL: https://github.com/apache/doris-spark-connector/pull/140
# Proposed changes
## Optimized
1. Optimize data transmission
- Before optimization: After formatting the batch data, package it as a
StringEntity and send it all
- After optimization: HTTP Chunk method is used to transmit data during
Stream Load request through InputStream, which reduces the memory consumption
caused by batch data conversion when building Entity.
2. Optimize partitioned data iteration
- Before optimization: Before batch splitting, the iterator.grouped method
was used to group the iterators according to batch size. After grouping, a
collection of batch size is obtained. At this time, all the corresponding
number of records need to be read into the memory. If If the batch setting is
large, the memory usage will also increase, which may easily lead to OOM.
- After optimization: iterate directly through the iterator object of the
partition, and implement an InputStream of Iterator<Row>. InputStream reads one
row of data each time, and maintains a counter object in InputSteam. When the
number of rows read is greater than or equal to the batch size , end the
reading of the Input Stream, and submit the Stream Load request. In this way,
the source end only needs to read the minimum batch of data during the entire
iterative reading process. There is no need to cache the entire written batch
of data, reducing memory usage.
## Test result
Environment information
- Single data size: about 8KB
- Spark resource:
- executor instance: 1
- exeuctor cores: 1
- executor memory: test variable
- Job configuration
- read
- doris.batch.size = 10000
- doris.request.tablet.size = 4
- write
- sink.properties.parallelism = 5
### Test 1
- Spark Executor memory: 1GB
- sink.batch.size = 100000
- Before optimization:


- After optimization:

### Test 2
- Spark Executor memory: 1GB
- sink.batch.size = 200000
- Before optimization: not performed
- After optimization:

### Test 3
- Spark Executor memory: 1GB
- sink.batch.size = 500000
- Before optimization: not performed
- After optimization:

### Test 4
- Spark Executor memory: 2GB
- sink.batch.size = 100000
- Before optimization:


- After optimization: Not performed
### Test 5
- Spark Executor memory: 4GB
- sink.batch.size = 100000
- Before optimization:


- After optimization: Not performed
### Test 6
- Executor memory: 16GB
- sink.batch.size = 100000
- Before optimization:


- After optimization: Not performed
## Test summary
According to the test results, the memory usage of the optimized connector
is relatively stable when the read size of each batch of the source segment
remains unchanged, and the impact of the write batch size on memory usage is
small, and it also reduces the time due to insufficient memory. The problem of
slow data processing caused by high CPU usage caused by GC.
## Checklist(Required)
1. Does it affect the original behavior: (Yes/No/I Don't know)
4. Has unit tests been added: (Yes/No/No Need)
5. Has document been added or modified: (Yes/No/No Need)
6. Does it need to update dependencies: (Yes/No)
7. Are there any changes that cannot be rolled back: (Yes/No)
## Further comments
If this is a relatively large or complex change, kick off the discussion at
[[email protected]](mailto:[email protected]) by explaining why you
chose the solution you did and what alternatives you considered, etc...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]