Changeset: 9a93a50925ef for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/9a93a50925ef
Modified Files:
        common/stream/mapi_stream.c
        common/stream/socket_stream.c
        monetdb5/mal/mal_client.c
        monetdb5/modules/mal/tablet.c
        sql/backends/monet5/sql.c
        sql/server/sql_scan.c
Branch: Aug2024
Log Message:

Fix a problem that interrupting a query halfway on the client didn't work 
properly.


diffs (185 lines):

diff --git a/common/stream/mapi_stream.c b/common/stream/mapi_stream.c
--- a/common/stream/mapi_stream.c
+++ b/common/stream/mapi_stream.c
@@ -152,8 +152,12 @@ setup_transfer(const char *req, const ch
        bool ok;
        int oob = 0;
 
-       while (!bs->eof)
-               bstream_next(bs);
+       while (!bs->eof) {
+               if (bstream_next(bs) < 0) {
+                       msg = mnstr_peek_error(ws);
+                       goto end;
+               }
+       }
        stream *rs = bs->s;
        assert(isa_block_stream(ws));
        assert(isa_block_stream(rs));
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -126,7 +126,7 @@ socket_read(stream *restrict s, void *re
                        struct pollfd pfd;
 
                        pfd = (struct pollfd) {.fd = s->stream_data.s,
-                                              .events = POLLIN};
+                                              .events = POLLIN | POLLPRI};
 
                        ret = poll(&pfd, 1, (int) s->timeout);
                        if (ret == -1 && errno == EINTR)
@@ -135,6 +135,20 @@ socket_read(stream *restrict s, void *re
                                mnstr_set_error_errno(s, MNSTR_READ_ERROR, 
"poll error");
                                return -1;
                        }
+                       if (ret == 1 && pfd.revents & POLLPRI) {
+                               char b = 0;
+                               switch (recv(s->stream_data.s, &b, 1, MSG_OOB)) 
{
+                               case 0:
+                                       /* unexpectedly didn't receive a byte */
+                                       continue;
+                               case 1:
+                                       mnstr_set_error(s, MNSTR_INTERRUPT, 
"query abort from client");
+                                       return -1;
+                               case -1:
+                                       mnstr_set_error_errno(s, 
MNSTR_READ_ERROR, "recv error");
+                                       return -1;
+                               }
+                       }
 #else
                        struct timeval tv;
                        fd_set fds;
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -107,11 +107,13 @@ MCpushClientInput(Client c, bstream *new
        ClientInput *x = (ClientInput *) GDKmalloc(sizeof(ClientInput));
        if (x == 0)
                return -1;
-       x->fdin = c->fdin;
-       x->yycur = c->yycur;
-       x->listing = c->listing;
-       x->prompt = c->prompt;
-       x->next = c->bak;
+       *x = (ClientInput) {
+               .fdin = c->fdin,
+               .yycur = c->yycur,
+               .listing = c->listing,
+               .prompt = c->prompt,
+               .next = c->bak,
+       };
        c->bak = x;
        c->fdin = new_input;
        c->qryctx.bs = new_input;
@@ -567,7 +569,10 @@ MCreadClient(Client c)
                        if (!in->mode)          /* read one line at a time in 
line mode */
                                break;
                }
-               if (in->mode) {                 /* find last new line */
+               if (rd < 0) {
+                       /* force end of stream handling below */
+                       in->pos = in->len;
+               } else if (in->mode) {                  /* find last new line */
                        char *p = in->buf + in->len - 1;
 
                        while (p > in->buf && *p != '\n') {
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -601,8 +601,13 @@ tablet_read_more(READERtask *task)
                do {
                        /* query is not finished ask for more */
                        /* we need more query text */
-                       if (bstream_next(in) < 0)
+                       if (bstream_next(in) < 0) {
+                               if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
+                                       task->aborted = true;
+                                       mnstr_clearerr(in->s);
+                               }
                                return false;
+                       }
                        if (in->eof) {
                                if (bstream_getoob(in)) {
                                        task->aborted = true;
@@ -612,7 +617,14 @@ tablet_read_more(READERtask *task)
                                        mnstr_flush(out, MNSTR_FLUSH_DATA);
                                in->eof = false;
                                /* we need more query text */
-                               if (bstream_next(in) <= 0)
+                               if (bstream_next(in) < 0) {
+                                       if (mnstr_errnr(in->s) == 
MNSTR_INTERRUPT) {
+                                               task->aborted = true;
+                                               mnstr_clearerr(in->s);
+                                       }
+                                       return false;
+                               }
+                               if (in->eof)
                                        return false;
                        }
                } while (in->len <= in->pos);
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -2633,8 +2633,12 @@ mvc_export_table_wrap( Client cntxt, Mal
                }
                be->output_format = OFMT_CSV;
        } else {
-               while (!m->scanner.rs->eof)
-                       bstream_next(m->scanner.rs);
+               while (!m->scanner.rs->eof) {
+                       if (bstream_next(m->scanner.rs) < 0) {
+                               msg = createException(IO, "streams.open", 
"interrupted");
+                               goto wrapup_result_set1;
+                       }
+               }
                s = m->scanner.ws;
                mnstr_write(s, PROMPT3, sizeof(PROMPT3) - 1, 1);
                mnstr_printf(s, "w %s\n", filename);
@@ -2868,8 +2872,12 @@ mvc_export_row_wrap( Client cntxt, MalBl
                        goto wrapup_result_set;
                }
        } else {
-               while (!m->scanner.rs->eof)
-                       bstream_next(m->scanner.rs);
+               while (!m->scanner.rs->eof) {
+                       if (bstream_next(m->scanner.rs) < 0) {
+                               msg = createException(IO, "streams.open", 
"interrupted");
+                               goto wrapup_result_set;
+                       }
+               }
                s = m->scanner.ws;
                mnstr_write(s, PROMPT3, sizeof(PROMPT3) - 1, 1);
                mnstr_printf(s, "w %s\n", filename);
@@ -4310,7 +4318,8 @@ SQLhot_snapshot(Client cntxt, MalBlkPtr 
 
        // sync with client, copy pasted from mvc_export_table_wrap
        while (!mvc->scanner.rs->eof)
-               bstream_next(mvc->scanner.rs);
+               if (bstream_next(mvc->scanner.rs) < 0)
+                       throw(SQL, "sql.hot_snapshot", "interrupted");
 
        // The snapshot code flushes from time to time.
        // Use a callback stream to suppress those.
diff --git a/sql/server/sql_scan.c b/sql/server/sql_scan.c
--- a/sql/server/sql_scan.c
+++ b/sql/server/sql_scan.c
@@ -714,9 +714,16 @@ scanner_read_more(struct scanner *lc, si
                        more = true;
                }
                /* we need more query text */
-               if (bstream_next(b) < 0 ||
-                   /* we asked for more data but didn't get any */
-                   (more && b->eof && b->len < b->pos + lc->yycur + n))
+               if (bstream_next(b) < 0) {
+                       if (mnstr_errnr(b->s) == MNSTR_INTERRUPT) {
+                               // now what?
+                               lc->errstr = "Query aborted";
+                               lc->aborted = true;
+                               mnstr_clearerr(b->s);
+                       }
+                       return EOF;
+               } else if (/* we asked for more data but didn't get any */
+                          (more && b->eof && b->len < b->pos + lc->yycur + n))
                        return EOF;
                if (more && b->pos + lc->yycur + 2 == b->len && b->buf[b->pos + 
lc->yycur] == '\200' && b->buf[b->pos + lc->yycur + 1] == '\n') {
                        lc->errstr = "Query aborted";
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to