wenzhenghu commented on issue #63325:
URL: https://github.com/apache/doris/issues/63325#issuecomment-4470827857
Adding the reproduction script and exact command lines here for completeness.
The real host/IP is intentionally anonymized as `<REDACTED_HOST>`.
## Reproduction Script
```python
#!/usr/bin/env python3
import argparse
import base64
import http.client
import json
import sys
import time
import uuid
import pymysql
import requests
def parse_args():
parser = argparse.ArgumentParser(
description="Send a chunked stream load request to FE or BE with
controllable pacing."
)
parser.add_argument("--host", default="<REDACTED_HOST>", help="Doris
FE/BE host")
parser.add_argument("--mysql-port", type=int, default=9034, help="Doris
MySQL port")
parser.add_argument("--fe-http-port", type=int, default=8034,
help="Doris FE HTTP port")
parser.add_argument("--be-http-port", type=int, default=8044,
help="Doris BE HTTP port")
parser.add_argument("--user", default="root", help="Doris user")
parser.add_argument("--password", default="", help="Doris password")
parser.add_argument("--db", default="wzhtest", help="Target database")
parser.add_argument("--table", default="stream_load_redirect_repro",
help="Target table")
parser.add_argument(
"--target",
choices=["fe", "be"],
default="fe",
help="Choose whether the request goes to FE or BE HTTP port",
)
parser.add_argument(
"--payload-mb",
type=int,
default=8,
help="Approximate payload size in MiB generated by the script",
)
parser.add_argument(
"--chunk-kb",
type=int,
default=16,
help="Application chunk size in KiB for generator-based upload",
)
parser.add_argument(
"--sleep-ms",
type=int,
default=0,
help="Sleep duration between chunks to enlarge the redirect race
window",
)
parser.add_argument(
"--connect-timeout",
type=int,
default=5,
help="HTTP connect timeout in seconds",
)
parser.add_argument(
"--read-timeout",
type=int,
default=600,
help="HTTP read timeout in seconds",
)
parser.add_argument(
"--label-prefix",
default="stream_load_redirect_repro",
help="Prefix used to build a unique stream load label",
)
parser.add_argument(
"--client",
choices=["requests", "httpclient"],
default="requests",
help="Choose the Python HTTP stack used to send the stream load
request",
)
parser.add_argument(
"--truncate-before",
action="store_true",
help="Truncate the target table before sending the stream load
request",
)
parser.add_argument(
"--show-row-count",
action="store_true",
help="Query and print the target table row count after the request",
)
return parser.parse_args()
def build_auth_header(user, password):
token =
base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
return f"Basic {token}"
def connect_mysql(args):
return pymysql.connect(
host=args.host,
port=args.mysql_port,
user=args.user,
password=args.password,
connect_timeout=5,
read_timeout=10,
write_timeout=10,
autocommit=True,
)
def truncate_table(args):
with connect_mysql(args) as conn:
with conn.cursor() as cur:
cur.execute(f"TRUNCATE TABLE `{args.db}`.`{args.table}`")
def query_row_count(args):
with connect_mysql(args) as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT COUNT(*) FROM `{args.db}`.`{args.table}`")
return cur.fetchone()[0]
def build_csv_chunk(chunk_bytes, chunk_index):
rows = []
current = 0
row_index = chunk_index * 100000
payload_width = max(8, min(512, chunk_bytes // 8))
while True:
row = f"{row_index},payload_{chunk_index}_{'x' * payload_width}\n"
row_size = len(row.encode("utf-8"))
if rows and current + row_size > chunk_bytes:
break
rows.append(row)
current += row_size
row_index += 1
if current >= chunk_bytes:
break
return "".join(rows).encode("utf-8")
def chunked_csv_generator(total_bytes, chunk_bytes, sleep_ms):
sent = 0
chunk_index = 0
while sent < total_bytes:
remaining = total_bytes - sent
current_size = min(chunk_bytes, remaining)
yield build_csv_chunk(current_size, chunk_index)
sent += current_size
chunk_index += 1
if sleep_ms > 0:
time.sleep(sleep_ms / 1000.0)
def stream_load(args):
port = args.fe_http_port if args.target == "fe" else args.be_http_port
url =
f"http://{args.host}:{port}/api/{args.db}/{args.table}/_stream_load"
label = f"{args.label_prefix}_{args.target}_{uuid.uuid4().hex[:12]}"
total_bytes = args.payload_mb * 1024 * 1024
chunk_bytes = args.chunk_kb * 1024
headers = {
"Authorization": build_auth_header(args.user, args.password),
"Expect": "100-continue",
"column_separator": ",",
"label": label,
}
if args.client == "httpclient":
return stream_load_with_httpclient(args, url, label, total_bytes,
chunk_bytes, headers)
session = requests.Session()
started = time.time()
try:
response = session.put(
url=url,
headers=headers,
data=chunked_csv_generator(total_bytes, chunk_bytes,
args.sleep_ms),
allow_redirects=False,
timeout=(args.connect_timeout, args.read_timeout),
)
elapsed = time.time() - started
print(json.dumps(
{
"target": args.target,
"url": url,
"label": label,
"status_code": response.status_code,
"elapsed_seconds": round(elapsed, 3),
"headers": dict(response.headers),
"body": response.text[:2000],
},
ensure_ascii=False,
indent=2,
))
return 0
except Exception as exc:
elapsed = time.time() - started
print(json.dumps(
{
"target": args.target,
"url": url,
"label": label,
"elapsed_seconds": round(elapsed, 3),
"exception_type": type(exc).__name__,
"exception": repr(exc),
},
ensure_ascii=False,
indent=2,
))
return 1
finally:
session.close()
def stream_load_with_httpclient(args, url, label, total_bytes, chunk_bytes,
headers):
port = args.fe_http_port if args.target == "fe" else args.be_http_port
parsed_path = f"/api/{args.db}/{args.table}/_stream_load"
conn = http.client.HTTPConnection(args.host, port,
timeout=args.read_timeout)
started = time.time()
try:
conn.putrequest("PUT", parsed_path)
conn.putheader("Authorization", headers["Authorization"])
conn.putheader("Expect", headers["Expect"])
conn.putheader("column_separator", headers["column_separator"])
conn.putheader("label", label)
conn.putheader("Transfer-Encoding", "chunked")
conn.endheaders()
for chunk in chunked_csv_generator(total_bytes, chunk_bytes,
args.sleep_ms):
conn.send(f"{len(chunk):X}\r\n".encode("ascii"))
conn.send(chunk)
conn.send(b"\r\n")
conn.send(b"0\r\n\r\n")
response = conn.getresponse()
body = response.read().decode("utf-8", errors="replace")
elapsed = time.time() - started
print(json.dumps(
{
"target": args.target,
"url": url,
"label": label,
"client": args.client,
"status_code": response.status,
"elapsed_seconds": round(elapsed, 3),
"headers": dict(response.getheaders()),
"body": body[:2000],
},
ensure_ascii=False,
indent=2,
))
return 0
except Exception as exc:
elapsed = time.time() - started
print(json.dumps(
{
"target": args.target,
"url": url,
"label": label,
"client": args.client,
"elapsed_seconds": round(elapsed, 3),
"exception_type": type(exc).__name__,
"exception": repr(exc),
},
ensure_ascii=False,
indent=2,
))
return 1
finally:
conn.close()
def main():
args = parse_args()
if args.truncate_before:
truncate_table(args)
rc = stream_load(args)
if args.show_row_count:
try:
row_count = query_row_count(args)
print(json.dumps({"row_count": row_count}, ensure_ascii=False))
except Exception as exc:
print(json.dumps({"row_count_error": repr(exc)},
ensure_ascii=False))
return rc
if __name__ == "__main__":
sys.exit(main())
```
## Reproduction Commands
### Doris 3.0 FE: receives a normal 307 redirect
```bash
python stream_load_redirect_repro.py \
--client httpclient \
--host <REDACTED_HOST> \
--mysql-port 9030 \
--fe-http-port 8030 \
--be-http-port 8040 \
--db wzh \
--table stream_load_redirect_repro \
--target fe \
--payload-mb 1 \
--chunk-kb 1 \
--sleep-ms 0 \
--truncate-before \
--show-row-count
```
Expected result: FE returns `307 Temporary Redirect`.
### Doris master FE: reproduces connection reset / broken pipe
```bash
python stream_load_redirect_repro.py \
--client httpclient \
--host <REDACTED_HOST> \
--mysql-port 9034 \
--fe-http-port 8034 \
--be-http-port 8044 \
--db wzhtest \
--table stream_load_redirect_repro \
--target fe \
--payload-mb 1 \
--chunk-kb 1 \
--sleep-ms 0 \
--truncate-before \
--show-row-count
```
Typical result on `master`: `ConnectionResetError`.
The following variant can reproduce `BrokenPipeError` more aggressively by
increasing the write window:
```bash
python stream_load_redirect_repro.py \
--client httpclient \
--host <REDACTED_HOST> \
--mysql-port 9034 \
--fe-http-port 8034 \
--be-http-port 8044 \
--db wzhtest \
--table stream_load_redirect_repro \
--target fe \
--payload-mb 8 \
--chunk-kb 16 \
--sleep-ms 10 \
--truncate-before \
--show-row-count
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]