> Thinking a bit outside the box..  I was wondering about a plan D (plan
> A is what's on HEAD, plan B is copying around the query string, plan C
> this Vfd approach) where we shut down the executor when we have
> finished the execution of an unnamed portal, with something like the
> attached based on a more aggressive PortalCleanup().

I am not sure I like this idea as-is, because besides that fact
that it's still a wire level change, it's not safe at all to
re-enter exec_execute_message after you just cleaned the
portal but did not drop it.

if (!PortalIsValid(portal)) will tell you the portal is still valid, but its
resources, like queryDesc and others are no longer available.
You can actually see what happens there with this handy
 extended.py that communicates directly over the wire-protocol.
See the "back-to-back execute" print.

It results in
```
TRAP: failed Assert("queryDesc || portal->holdStore"), File:
"../src/backend/tcop/pquery.c", Line: 875, PID: 32358
0   postgres                            0x00000001028ce52c
ExceptionalCondition + 108
1   postgres                            0x00000001027a2470 PortalRunMulti + 0
2   postgres                            0x00000001027a1fc0 PortalRun + 492
3   postgres                            0x000000010279ff54 PostgresMain + 8896
4   postgres                            0x0000000102799bbc BackendInitialize + 0
5   postgres                            0x00000001026f1264
postmaster_child_launch + 364
6   postgres                            0x00000001026f588c ServerLoop + 5840
7   postgres                            0x00000001026f3800
InitProcessGlobals + 0
8   postgres                            0x0000000102641564 help + 0
9   dyld                                0x00000001930d6b98 start + 6076
```

This idea could perhaps work, but needs more thought.

> Perhaps the best answer for now is to revert and continue the
> discussion for this cycle as there seem to be little love for the
> current HEAD's solution with the protocol change.
>
> If folks have more ideas or input, please feel free.

That is probably the best idea now.

> This is new, attaching the information to a Vfd in fd.c.  Not sure
> that adding this information to this structure is a good concept.
> This layer of the code has no idea of query strings currently, so that
> feels a bit like a layer violation.

Thanks for having a look! FWIW I found a way for Plan C to work without
including tcoprot into fd, see attached.

There's a new field in that structure indeed, but maybe not that far
fetched, it's the query that triggered the creation of the file.

To Michael's point, this looks like a layering violation. Besides,
I think this will double log, although I did not test, because you log
the statement once when closing the temp and once when logging
the STATEMENT.

One thing I am still not so sure about is we currently say that things
like the query_string will out live the portal, so I am still not clear what
is the risk of copying the query_string to debug_query_string during
PortalDrop?

```
/*
* We don't have to copy anything into the portal, because everything
* we are passing here is in MessageContext or the
* per_parsetree_context, and so will outlive the portal anyway.
*/
PortalDefineQuery(portal,
NULL,
query_string,
commandTag,
plantree_list,
NULL);
```

--
Sami Imseih
Amazon Web Services (AWS)
import socket
import struct

HOST = "127.0.0.1"
PORT = 5432
USER = "postgres"
PASSWORD = "yourpassword"
DATABASE = "postgres"

def send_msg(sock, msg_type, payload):
    sock.sendall(msg_type.encode("ascii") + struct.pack("!I", len(payload) + 4) + payload)

def recv_msg(sock):
    msg_type = sock.recv(1)
    length = struct.unpack("!I", sock.recv(4))[0] - 4
    payload = sock.recv(length)
    return msg_type, payload

def read_cstring(data, offset):
    end = data.index(0, offset)
    return data[offset:end].decode(), end + 1

def startup(sock):
    params = {"user": USER, "database": DATABASE, "client_encoding": "UTF8"}
    body = b"".join(k.encode() + b"\x00" + v.encode() + b"\x00" for k, v in params.items()) + b"\x00"
    packet = struct.pack("!I", 196608) + body
    sock.sendall(struct.pack("!I", len(packet) + 4) + packet)

def authenticate(sock):
    while True:
        msg_type, data = recv_msg(sock)
        if msg_type == b"R":
            code = struct.unpack("!I", data[:4])[0]
            if code == 0:
                print("Authentication OK")
            elif code == 3:
                send_msg(sock, "p", PASSWORD.encode() + b"\x00")
                continue
            else:
                raise Exception(f"Unsupported auth method: {code}")
        elif msg_type in (b"K", b"S"):
            continue
        elif msg_type == b"Z":
            break

def parse_row_description(payload):
    col_count = struct.unpack("!H", payload[:2])[0]
    offset = 2
    columns = []
    for _ in range(col_count):
        name, offset = read_cstring(payload, offset)
        offset += 18
        columns.append(name)
    return columns

def parse_data_row(payload):
    field_count = struct.unpack("!H", payload[:2])[0]
    offset = 2
    fields = []
    for _ in range(field_count):
        (length,) = struct.unpack("!I", payload[offset:offset+4])
        offset += 4
        if length == 0xFFFFFFFF:
            fields.append(None)
        else:
            fields.append(payload[offset:offset+length].decode())
            offset += length
    return fields

def extended_query(sock, sql):
    # 1�ク鞘Ε Parse: unnamed statement
    payload = b"\x00" + sql.encode() + b"\x00" + struct.pack("!H", 0)  # no params
    send_msg(sock, "P", payload)

    # 2�ク鞘Ε Bind: unnamed portal + statement
    payload = b"\x00\x00" + struct.pack("!H", 0) + struct.pack("!H", 0) + struct.pack("!H", 0)
    send_msg(sock, "B", payload)

    # 3�ク鞘Ε Describe (portal)
    send_msg(sock, "D", b"P\x00")

    # 4�ク鞘Ε Execute
    send_msg(sock, "E", b"\x00" + struct.pack("!I", 7))  # fetch all rows
    print("back-to-back execute ......");
    send_msg(sock, "E", b"\x00" + struct.pack("!I", 1))  # fetch all rows

    # 3�ク鞘Ε Describe (portal
    #send_msg(sock, "D", b"P\x00")

    # 5�ク鞘Ε Sync
    send_msg(sock, "S", b"")

    # Receive responses
    columns = []
    rows = []
    while True:
        msg_type, data = recv_msg(sock)
        if msg_type == b"1":  # ParseComplete
            continue
        elif msg_type == b"2":  # BindComplete
            continue
        elif msg_type == b"T":  # RowDescription
            columns = parse_row_description(data)
        elif msg_type == b"D":  # DataRow
            rows.append(parse_data_row(data))
        elif msg_type == b"C":  # CommandComplete
            continue
        elif msg_type == b"Z":  # ReadyForQuery
            break
        elif msg_type == b"E":  # ErrorResponse
            print("Error:", data)
            break

    for row in rows:
        print(dict(zip(columns, row)))

def main():
    with socket.create_connection((HOST, PORT)) as sock:
        startup(sock)
        authenticate(sock)
        extended_query(sock, "SELECT * from foo;")

if __name__ == "__main__":
    main()

Reply via email to