wenzhenghu commented on issue #63325:
URL: https://github.com/apache/doris/issues/63325#issuecomment-4470827857

   Adding the reproduction script and exact command lines here for completeness.
   The real host/IP is intentionally anonymized as `<REDACTED_HOST>`.
   
   ## Reproduction Script
   
   ```python
   #!/usr/bin/env python3
   import argparse
   import base64
   import http.client
   import json
   import sys
   import time
   import uuid
   
   import pymysql
   import requests
   
   
   def parse_args():
       parser = argparse.ArgumentParser(
           description="Send a chunked stream load request to FE or BE with 
controllable pacing."
       )
       parser.add_argument("--host", default="<REDACTED_HOST>", help="Doris 
FE/BE host")
       parser.add_argument("--mysql-port", type=int, default=9034, help="Doris 
MySQL port")
       parser.add_argument("--fe-http-port", type=int, default=8034, 
help="Doris FE HTTP port")
       parser.add_argument("--be-http-port", type=int, default=8044, 
help="Doris BE HTTP port")
       parser.add_argument("--user", default="root", help="Doris user")
       parser.add_argument("--password", default="", help="Doris password")
       parser.add_argument("--db", default="wzhtest", help="Target database")
       parser.add_argument("--table", default="stream_load_redirect_repro", 
help="Target table")
       parser.add_argument(
           "--target",
           choices=["fe", "be"],
           default="fe",
           help="Choose whether the request goes to FE or BE HTTP port",
       )
       parser.add_argument(
           "--payload-mb",
           type=int,
           default=8,
           help="Approximate payload size in MiB generated by the script",
       )
       parser.add_argument(
           "--chunk-kb",
           type=int,
           default=16,
           help="Application chunk size in KiB for generator-based upload",
       )
       parser.add_argument(
           "--sleep-ms",
           type=int,
           default=0,
           help="Sleep duration between chunks to enlarge the redirect race 
window",
       )
       parser.add_argument(
           "--connect-timeout",
           type=int,
           default=5,
           help="HTTP connect timeout in seconds",
       )
       parser.add_argument(
           "--read-timeout",
           type=int,
           default=600,
           help="HTTP read timeout in seconds",
       )
       parser.add_argument(
           "--label-prefix",
           default="stream_load_redirect_repro",
           help="Prefix used to build a unique stream load label",
       )
       parser.add_argument(
           "--client",
           choices=["requests", "httpclient"],
           default="requests",
           help="Choose the Python HTTP stack used to send the stream load 
request",
       )
       parser.add_argument(
           "--truncate-before",
           action="store_true",
           help="Truncate the target table before sending the stream load 
request",
       )
       parser.add_argument(
           "--show-row-count",
           action="store_true",
           help="Query and print the target table row count after the request",
       )
       return parser.parse_args()
   
   
   def build_auth_header(user, password):
       token = 
base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
       return f"Basic {token}"
   
   
   def connect_mysql(args):
       return pymysql.connect(
           host=args.host,
           port=args.mysql_port,
           user=args.user,
           password=args.password,
           connect_timeout=5,
           read_timeout=10,
           write_timeout=10,
           autocommit=True,
       )
   
   
   def truncate_table(args):
       with connect_mysql(args) as conn:
           with conn.cursor() as cur:
               cur.execute(f"TRUNCATE TABLE `{args.db}`.`{args.table}`")
   
   
   def query_row_count(args):
       with connect_mysql(args) as conn:
           with conn.cursor() as cur:
               cur.execute(f"SELECT COUNT(*) FROM `{args.db}`.`{args.table}`")
               return cur.fetchone()[0]
   
   
   def build_csv_chunk(chunk_bytes, chunk_index):
       rows = []
       current = 0
       row_index = chunk_index * 100000
       payload_width = max(8, min(512, chunk_bytes // 8))
       while True:
           row = f"{row_index},payload_{chunk_index}_{'x' * payload_width}\n"
           row_size = len(row.encode("utf-8"))
           if rows and current + row_size > chunk_bytes:
               break
           rows.append(row)
           current += row_size
           row_index += 1
           if current >= chunk_bytes:
               break
       return "".join(rows).encode("utf-8")
   
   
   def chunked_csv_generator(total_bytes, chunk_bytes, sleep_ms):
       sent = 0
       chunk_index = 0
       while sent < total_bytes:
           remaining = total_bytes - sent
           current_size = min(chunk_bytes, remaining)
           yield build_csv_chunk(current_size, chunk_index)
           sent += current_size
           chunk_index += 1
           if sleep_ms > 0:
               time.sleep(sleep_ms / 1000.0)
   
   
   def stream_load(args):
       port = args.fe_http_port if args.target == "fe" else args.be_http_port
       url = 
f"http://{args.host}:{port}/api/{args.db}/{args.table}/_stream_load";
       label = f"{args.label_prefix}_{args.target}_{uuid.uuid4().hex[:12]}"
       total_bytes = args.payload_mb * 1024 * 1024
       chunk_bytes = args.chunk_kb * 1024
   
       headers = {
           "Authorization": build_auth_header(args.user, args.password),
           "Expect": "100-continue",
           "column_separator": ",",
           "label": label,
       }
   
       if args.client == "httpclient":
           return stream_load_with_httpclient(args, url, label, total_bytes, 
chunk_bytes, headers)
   
       session = requests.Session()
       started = time.time()
       try:
           response = session.put(
               url=url,
               headers=headers,
               data=chunked_csv_generator(total_bytes, chunk_bytes, 
args.sleep_ms),
               allow_redirects=False,
               timeout=(args.connect_timeout, args.read_timeout),
           )
           elapsed = time.time() - started
           print(json.dumps(
               {
                   "target": args.target,
                   "url": url,
                   "label": label,
                   "status_code": response.status_code,
                   "elapsed_seconds": round(elapsed, 3),
                   "headers": dict(response.headers),
                   "body": response.text[:2000],
               },
               ensure_ascii=False,
               indent=2,
           ))
           return 0
       except Exception as exc:
           elapsed = time.time() - started
           print(json.dumps(
               {
                   "target": args.target,
                   "url": url,
                   "label": label,
                   "elapsed_seconds": round(elapsed, 3),
                   "exception_type": type(exc).__name__,
                   "exception": repr(exc),
               },
               ensure_ascii=False,
               indent=2,
           ))
           return 1
       finally:
           session.close()
   
   
   def stream_load_with_httpclient(args, url, label, total_bytes, chunk_bytes, 
headers):
       port = args.fe_http_port if args.target == "fe" else args.be_http_port
       parsed_path = f"/api/{args.db}/{args.table}/_stream_load"
       conn = http.client.HTTPConnection(args.host, port, 
timeout=args.read_timeout)
       started = time.time()
       try:
           conn.putrequest("PUT", parsed_path)
           conn.putheader("Authorization", headers["Authorization"])
           conn.putheader("Expect", headers["Expect"])
           conn.putheader("column_separator", headers["column_separator"])
           conn.putheader("label", label)
           conn.putheader("Transfer-Encoding", "chunked")
           conn.endheaders()
   
           for chunk in chunked_csv_generator(total_bytes, chunk_bytes, 
args.sleep_ms):
               conn.send(f"{len(chunk):X}\r\n".encode("ascii"))
               conn.send(chunk)
               conn.send(b"\r\n")
           conn.send(b"0\r\n\r\n")
   
           response = conn.getresponse()
           body = response.read().decode("utf-8", errors="replace")
           elapsed = time.time() - started
           print(json.dumps(
               {
                   "target": args.target,
                   "url": url,
                   "label": label,
                   "client": args.client,
                   "status_code": response.status,
                   "elapsed_seconds": round(elapsed, 3),
                   "headers": dict(response.getheaders()),
                   "body": body[:2000],
               },
               ensure_ascii=False,
               indent=2,
           ))
           return 0
       except Exception as exc:
           elapsed = time.time() - started
           print(json.dumps(
               {
                   "target": args.target,
                   "url": url,
                   "label": label,
                   "client": args.client,
                   "elapsed_seconds": round(elapsed, 3),
                   "exception_type": type(exc).__name__,
                   "exception": repr(exc),
               },
               ensure_ascii=False,
               indent=2,
           ))
           return 1
       finally:
           conn.close()
   
   
   def main():
       args = parse_args()
       if args.truncate_before:
           truncate_table(args)
       rc = stream_load(args)
       if args.show_row_count:
           try:
               row_count = query_row_count(args)
               print(json.dumps({"row_count": row_count}, ensure_ascii=False))
           except Exception as exc:
               print(json.dumps({"row_count_error": repr(exc)}, 
ensure_ascii=False))
       return rc
   
   
   if __name__ == "__main__":
       sys.exit(main())
   ```
   
   ## Reproduction Commands
   
   ### Doris 3.0 FE: receives a normal 307 redirect
   
   ```bash
   python stream_load_redirect_repro.py \
     --client httpclient \
     --host <REDACTED_HOST> \
     --mysql-port 9030 \
     --fe-http-port 8030 \
     --be-http-port 8040 \
     --db wzh \
     --table stream_load_redirect_repro \
     --target fe \
     --payload-mb 1 \
     --chunk-kb 1 \
     --sleep-ms 0 \
     --truncate-before \
     --show-row-count
   ```
   
   Expected result: FE returns `307 Temporary Redirect`.
   
   ### Doris master FE: reproduces connection reset / broken pipe
   
   ```bash
   python stream_load_redirect_repro.py \
     --client httpclient \
     --host <REDACTED_HOST> \
     --mysql-port 9034 \
     --fe-http-port 8034 \
     --be-http-port 8044 \
     --db wzhtest \
     --table stream_load_redirect_repro \
     --target fe \
     --payload-mb 1 \
     --chunk-kb 1 \
     --sleep-ms 0 \
     --truncate-before \
     --show-row-count
   ```
   
   Typical result on `master`: `ConnectionResetError`.
   
   The following variant can reproduce `BrokenPipeError` more aggressively by 
increasing the write window:
   
   ```bash
   python stream_load_redirect_repro.py \
     --client httpclient \
     --host <REDACTED_HOST> \
     --mysql-port 9034 \
     --fe-http-port 8034 \
     --be-http-port 8044 \
     --db wzhtest \
     --table stream_load_redirect_repro \
     --target fe \
     --payload-mb 8 \
     --chunk-kb 16 \
     --sleep-ms 10 \
     --truncate-before \
     --show-row-count
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to