Changeset: 9a93a50925ef for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/9a93a50925ef Modified Files: common/stream/mapi_stream.c common/stream/socket_stream.c monetdb5/mal/mal_client.c monetdb5/modules/mal/tablet.c sql/backends/monet5/sql.c sql/server/sql_scan.c Branch: Aug2024 Log Message:
Fix a problem that interrupting a query halfway on the client didn't work properly. diffs (185 lines): diff --git a/common/stream/mapi_stream.c b/common/stream/mapi_stream.c --- a/common/stream/mapi_stream.c +++ b/common/stream/mapi_stream.c @@ -152,8 +152,12 @@ setup_transfer(const char *req, const ch bool ok; int oob = 0; - while (!bs->eof) - bstream_next(bs); + while (!bs->eof) { + if (bstream_next(bs) < 0) { + msg = mnstr_peek_error(ws); + goto end; + } + } stream *rs = bs->s; assert(isa_block_stream(ws)); assert(isa_block_stream(rs)); diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -126,7 +126,7 @@ socket_read(stream *restrict s, void *re struct pollfd pfd; pfd = (struct pollfd) {.fd = s->stream_data.s, - .events = POLLIN}; + .events = POLLIN | POLLPRI}; ret = poll(&pfd, 1, (int) s->timeout); if (ret == -1 && errno == EINTR) @@ -135,6 +135,20 @@ socket_read(stream *restrict s, void *re mnstr_set_error_errno(s, MNSTR_READ_ERROR, "poll error"); return -1; } + if (ret == 1 && pfd.revents & POLLPRI) { + char b = 0; + switch (recv(s->stream_data.s, &b, 1, MSG_OOB)) { + case 0: + /* unexpectedly didn't receive a byte */ + continue; + case 1: + mnstr_set_error(s, MNSTR_INTERRUPT, "query abort from client"); + return -1; + case -1: + mnstr_set_error_errno(s, MNSTR_READ_ERROR, "recv error"); + return -1; + } + } #else struct timeval tv; fd_set fds; diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -107,11 +107,13 @@ MCpushClientInput(Client c, bstream *new ClientInput *x = (ClientInput *) GDKmalloc(sizeof(ClientInput)); if (x == 0) return -1; - x->fdin = c->fdin; - x->yycur = c->yycur; - x->listing = c->listing; - x->prompt = c->prompt; - x->next = c->bak; + *x = (ClientInput) { + .fdin = c->fdin, + .yycur = c->yycur, + .listing = c->listing, + .prompt = c->prompt, + .next = c->bak, + }; c->bak = x; c->fdin = new_input; c->qryctx.bs = new_input; @@ -567,7 +569,10 @@ MCreadClient(Client c) if (!in->mode) /* read one line at a time in line mode */ break; } - if (in->mode) { /* find last new line */ + if (rd < 0) { + /* force end of stream handling below */ + in->pos = in->len; + } else if (in->mode) { /* find last new line */ char *p = in->buf + in->len - 1; while (p > in->buf && *p != '\n') { diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c --- a/monetdb5/modules/mal/tablet.c +++ b/monetdb5/modules/mal/tablet.c @@ -601,8 +601,13 @@ tablet_read_more(READERtask *task) do { /* query is not finished ask for more */ /* we need more query text */ - if (bstream_next(in) < 0) + if (bstream_next(in) < 0) { + if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) { + task->aborted = true; + mnstr_clearerr(in->s); + } return false; + } if (in->eof) { if (bstream_getoob(in)) { task->aborted = true; @@ -612,7 +617,14 @@ tablet_read_more(READERtask *task) mnstr_flush(out, MNSTR_FLUSH_DATA); in->eof = false; /* we need more query text */ - if (bstream_next(in) <= 0) + if (bstream_next(in) < 0) { + if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) { + task->aborted = true; + mnstr_clearerr(in->s); + } + return false; + } + if (in->eof) return false; } } while (in->len <= in->pos); diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c --- a/sql/backends/monet5/sql.c +++ b/sql/backends/monet5/sql.c @@ -2633,8 +2633,12 @@ mvc_export_table_wrap( Client cntxt, Mal } be->output_format = OFMT_CSV; } else { - while (!m->scanner.rs->eof) - bstream_next(m->scanner.rs); + while (!m->scanner.rs->eof) { + if (bstream_next(m->scanner.rs) < 0) { + msg = createException(IO, "streams.open", "interrupted"); + goto wrapup_result_set1; + } + } s = m->scanner.ws; mnstr_write(s, PROMPT3, sizeof(PROMPT3) - 1, 1); mnstr_printf(s, "w %s\n", filename); @@ -2868,8 +2872,12 @@ mvc_export_row_wrap( Client cntxt, MalBl goto wrapup_result_set; } } else { - while (!m->scanner.rs->eof) - bstream_next(m->scanner.rs); + while (!m->scanner.rs->eof) { + if (bstream_next(m->scanner.rs) < 0) { + msg = createException(IO, "streams.open", "interrupted"); + goto wrapup_result_set; + } + } s = m->scanner.ws; mnstr_write(s, PROMPT3, sizeof(PROMPT3) - 1, 1); mnstr_printf(s, "w %s\n", filename); @@ -4310,7 +4318,8 @@ SQLhot_snapshot(Client cntxt, MalBlkPtr // sync with client, copy pasted from mvc_export_table_wrap while (!mvc->scanner.rs->eof) - bstream_next(mvc->scanner.rs); + if (bstream_next(mvc->scanner.rs) < 0) + throw(SQL, "sql.hot_snapshot", "interrupted"); // The snapshot code flushes from time to time. // Use a callback stream to suppress those. diff --git a/sql/server/sql_scan.c b/sql/server/sql_scan.c --- a/sql/server/sql_scan.c +++ b/sql/server/sql_scan.c @@ -714,9 +714,16 @@ scanner_read_more(struct scanner *lc, si more = true; } /* we need more query text */ - if (bstream_next(b) < 0 || - /* we asked for more data but didn't get any */ - (more && b->eof && b->len < b->pos + lc->yycur + n)) + if (bstream_next(b) < 0) { + if (mnstr_errnr(b->s) == MNSTR_INTERRUPT) { + // now what? + lc->errstr = "Query aborted"; + lc->aborted = true; + mnstr_clearerr(b->s); + } + return EOF; + } else if (/* we asked for more data but didn't get any */ + (more && b->eof && b->len < b->pos + lc->yycur + n)) return EOF; if (more && b->pos + lc->yycur + 2 == b->len && b->buf[b->pos + lc->yycur] == '\200' && b->buf[b->pos + lc->yycur + 1] == '\n') { lc->errstr = "Query aborted"; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org