I've read http://www.sqlite.org/cvstrac/wiki?p=MultiThreading under the
"Case in point: a benchmark application I've written for this purpose"
and found that current releases of SQLite do not appear to behave in
this manner.  I cannot find any documentation which clearly states
the intended behavior.

It appears that if 2 threads start transactions at the same time,
both inserting into the same table, neither thread can finish until
one has rolled back.

The first thread succeeds until the COMMIT is issued, then returns BUSY
on the COMMIT. The second thread keeps returning BUSY on the INSERT
statement.  I've created a test case which will retry on BUSY (up to
25x to prevent infinite loops).  You'll notice the test ultimately fails.
Also in this test case, I've created a test which does a ROLLBACK when
a BUSY is hit just to show that it does succeed.

Is this intended functionality?  It appears at least a release at some
point in time did not behave this way (Jan 10, 2003 from the Wiki).  Considering
the second thread never gets a successful response to an INSERT statement,
it would seem that it should not have tried to obtain a lock on that table,
preventing the COMMIT from succeeding... but it is... It seems to be a bug
to me.

I have attached the test case.

Any insight would be appreciated.


Here are the results (for both RETRY_BUSY scenarios and ROLLBACK):

$ gcc -Wall -D RETRY_BUSY=1 -W -o sqlitetest sqlitetest.c -l sqlite3
[EMAIL PROTECTED] ~ $ ./sqlitetest
Creating a table
0 => Executing: CREATE TABLE test_table(threadnum INT, cnt INT, testcol TEXT)
0 => started ....
1 => started ....
all threads started
0 => Executing: BEGIN TRANSACTION
1 => Executing: BEGIN TRANSACTION
0 => Executing: INSERT INTO test_table VALUES(0, 0, 'test0_0')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 1, 'test0_1')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 2, 'test0_2')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 3, 'test0_3')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 4, 'test0_4')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 5, 'test0_5')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 6, 'test0_6')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 7, 'test0_7')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 8, 'test0_8')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: INSERT INTO test_table VALUES(0, 9, 'test0_9')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => BUSY
0 => Executing: COMMIT
0 => BUSY
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => MAX BUSY CNT
1 => thread failed ...
0 => Executing: COMMIT
0 => finished.
exiting...(test failed)


[EMAIL PROTECTED] ~ $ gcc -Wall -D RETRY_BUSY=0 -W -o sqlitetest sqlitetest.c 
-l sqlite3
[EMAIL PROTECTED] ~ $ ./sqlitetest
Creating a table
0 => Executing: CREATE TABLE test_table(threadnum INT, cnt INT, testcol TEXT)
0 => started ....
1 => started ....
all threads started
0 => Executing: BEGIN TRANSACTION
1 => Executing: BEGIN TRANSACTION
0 => Executing: INSERT INTO test_table VALUES(0, 0, 'test0_0')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
0 => Executing: INSERT INTO test_table VALUES(0, 1, 'test0_1')
1 => Executing: ROLLBACK
1 => ROLLBACK succeeded
1 => Retrying transaction ...
0 => Executing: INSERT INTO test_table VALUES(0, 2, 'test0_2')
1 => Executing: BEGIN TRANSACTION
0 => Executing: INSERT INTO test_table VALUES(0, 3, 'test0_3')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
0 => Executing: INSERT INTO test_table VALUES(0, 4, 'test0_4')
1 => Executing: ROLLBACK
1 => ROLLBACK succeeded
1 => Retrying transaction ...
0 => Executing: INSERT INTO test_table VALUES(0, 5, 'test0_5')
1 => Executing: BEGIN TRANSACTION
0 => Executing: INSERT INTO test_table VALUES(0, 6, 'test0_6')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
0 => Executing: INSERT INTO test_table VALUES(0, 7, 'test0_7')
1 => Executing: ROLLBACK
1 => ROLLBACK succeeded
1 => Retrying transaction ...
0 => Executing: INSERT INTO test_table VALUES(0, 8, 'test0_8')
1 => Executing: BEGIN TRANSACTION
0 => Executing: INSERT INTO test_table VALUES(0, 9, 'test0_9')
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
0 => Executing: COMMIT
0 => finished.
1 => Executing: ROLLBACK
1 => ROLLBACK succeeded
1 => Retrying transaction ...
1 => Executing: BEGIN TRANSACTION
1 => Executing: INSERT INTO test_table VALUES(1, 0, 'test1_0')
1 => Executing: INSERT INTO test_table VALUES(1, 1, 'test1_1')
1 => Executing: INSERT INTO test_table VALUES(1, 2, 'test1_2')
1 => Executing: INSERT INTO test_table VALUES(1, 3, 'test1_3')
1 => Executing: INSERT INTO test_table VALUES(1, 4, 'test1_4')
1 => Executing: INSERT INTO test_table VALUES(1, 5, 'test1_5')
1 => Executing: INSERT INTO test_table VALUES(1, 6, 'test1_6')
1 => Executing: INSERT INTO test_table VALUES(1, 7, 'test1_7')
1 => Executing: INSERT INTO test_table VALUES(1, 8, 'test1_8')
1 => Executing: INSERT INTO test_table VALUES(1, 9, 'test1_9')
1 => Executing: COMMIT
1 => finished.
exiting...(test succeeded)



Thanks!
-Brad
#include <stdio.h>
#include <sqlite3.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <pthread.h>

#ifdef __Linux__
#  define _XOPEN_SOURCE 500
#endif
#include <unistd.h>

#define SQLITEDB "test.db"
#define NUM_THREADS 2
#define MAX_BUSY 25
#define NUM_INSERTS 10

#ifndef RETRY_BUSY
#define RETRY_BUSY 1
#endif

static void sync_thread(int thnum);

typedef struct sql_result {
        int alloc_rows;
        int rows;
        int cols;
        char ***data;
} sql_result;

typedef struct th_st {
        int th_num;
        /* 0 = not started, 1 = started, 2 = done, 3 = error */
        int state;
} th_st;

th_st *th_data = NULL;

pthread_mutex_t global_mutex;
int on_thread = 0;

#define SQLITE_ALLOC_ROWS 1024
static int sqlite3_rowcallback(void *myresult, int argc, char **argv, char 
**colName)
{
        sql_result *result = myresult;
        int i;
        (void)colName;
        if (result == NULL)
                return (SQLITE_ERROR);
        if (result->cols == 0)
                result->cols=argc;
        else if (result->cols != argc)
                return (SQLITE_ERROR);

        if (result->rows >= result->alloc_rows) {
                result->data=(char ***)realloc(result->data, 
(result->alloc_rows+SQLITE_ALLOC_ROWS)*sizeof(char **));
                result->alloc_rows += SQLITE_ALLOC_ROWS;
        }

        result->data[result->rows] = (char **)malloc((result->cols)*sizeof(char 
*));
        for (i=0; i<result->cols; i++) {
                if (argv[i] == NULL)
                        result->data[result->rows][i] = NULL;
                else
                        result->data[result->rows][i] = strdup(argv[i]);
        }
        result->rows++;
        return(SQLITE_OK);
}

static void sqlite3_printresult(sql_result *result)
{
        int i, j;
        if (!result->rows) return;

        printf("Query Results:\n");
        for (i=0; i<result->rows; i++) {
                for (j=0; j<result->cols; j++) {
                        if (j != 0)
                                printf(",");
                        printf("%s", result->data[i][j]);
                }
                printf("\n");
        }
        printf("\n");
}

static void sqlite3_freeresult(sql_result *result)
{
        int i, j;
        if (!result->rows) return;
        for (i=0; i<result->rows; i++) {
                for (j=0; j<result->cols; j++) {
                        free(result->data[i][j]);
                }
                free(result->data[i]);
        }
        free(result->data);
}

/** sqlite3_execute
 *  Description: Execute SQL statment, loop on busy
 *  Return     : 1 on success, 0 on failure, -1 on rollback/concurrency issue
 */
static int sqlite3_execute(sqlite3 *db_conn, const char *statement, void 
(*sync_thread_p)(int), int thnum)
{
        int rc, retval;
        sql_result result;
        char *errmsg = NULL;
#if RETRY_BUSY
        int busy_cnt = 0;
#endif

        memset(&result, 0, sizeof(result));

        do {
                errmsg = NULL;
                if (sync_thread_p != NULL)
                        (*sync_thread_p)(thnum);
                printf("%d => Executing: %s\n", thnum, statement);
                rc = sqlite3_exec(db_conn, statement, sqlite3_rowcallback, 
&result, &errmsg);
                switch (rc) {
                        case SQLITE_OK:
                                retval = 1;
                                break;
                        case SQLITE_BUSY:
#if RETRY_BUSY
                                busy_cnt++;
                                break;
#endif
                        case SQLITE_LOCKED:
                                /* Issue a rollback if not in autocommit mode */
                                if (!sqlite3_get_autocommit(db_conn)) {
                                        if (sqlite3_execute(db_conn, 
"ROLLBACK", sync_thread_p, thnum) != 1) {
                                                printf("%d => ROLLBACK 
failed\n", thnum);
                                        } else {
                                                printf("%d => ROLLBACK 
succeeded\n", thnum);
                                        }
                                }
                                retval = -1;
                        break;
                        default:
                                printf("%d => query failed: %s\n", thnum, 
errmsg);
                                retval = 0;
                        break;
                }

                if (errmsg != NULL) {
                        sqlite3_free(errmsg);
                        errmsg = NULL;
                }

#if RETRY_BUSY
                /* Don't loop too fast */
                if (rc == SQLITE_BUSY) {
                        if (busy_cnt > MAX_BUSY) {
                                rc = SQLITE_ERROR;
                                retval = 0;
                                printf("%d => MAX BUSY CNT\n", thnum);
                        } else {
                                printf("%d => BUSY\n", thnum);
                                usleep(10000);
                        }
                }
        } while (rc == SQLITE_BUSY);
#else
        } while (0); 
#endif

        sqlite3_printresult(&result);
        sqlite3_freeresult(&result);

        return(retval);
}

static void sync_thread(int thnum)
{
        /* Unassign from self, and assign to next thread */
        pthread_mutex_lock(&global_mutex);
        if (on_thread == thnum)
                on_thread++;
        pthread_mutex_unlock(&global_mutex);

        while(1) {
                pthread_mutex_lock(&global_mutex);

                /* Thread could disappear */
                if (on_thread < NUM_THREADS && th_data[on_thread].state >= 2)
                        on_thread++;

                if (on_thread >= NUM_THREADS)
                        on_thread = 0;
                
                if (on_thread == thnum) {
                        pthread_mutex_unlock(&global_mutex);
                        break;
                }
                pthread_mutex_unlock(&global_mutex);
                usleep(20000);
        }
}



void *testthread(void *arg)
{
        int rc;
        int rollback = 0;
        sqlite3 *db_conn = NULL;
        th_st *th = arg;
        char temp[255];
        int i;
        
        rc = sqlite3_open(SQLITEDB , &db_conn);
        if (rc) {
                printf("%d => sqlite3_open() failed: %s\n", th->th_num, 
sqlite3_errmsg(db_conn));
                sqlite3_close(db_conn);
                th->state = 3;
                return(NULL);
        }

        printf("%d => started ....\n", th->th_num);
        th->state = 1;

        /* We want to synchronize accesses by threads, just to see what happens 
*/
        do {
                if (rollback) {
                        printf("%d => Retrying transaction ...\n", th->th_num);
                        usleep(50000);
                }
                rollback = 0;
                
                rc = sqlite3_execute(db_conn, "BEGIN TRANSACTION", sync_thread, 
th->th_num);
                if (rc == -1) {
                        rollback = 1;
                        continue;
                } else if (rc == 0)
                        break;

                for (i=0; i<NUM_INSERTS; i++) {
                        snprintf(temp, sizeof(temp), "INSERT INTO test_table 
VALUES(%d, %d, 'test%d_%d')",
                                        th->th_num, i, th->th_num, i);
                        rc = sqlite3_execute(db_conn, temp, sync_thread, 
th->th_num);
                        if (rc == -1) {
                                rollback = 1;
                                break;;
                        } else if (rc == 0)
                                break;
                }
                if (rc == 0) break;
                if (rollback) continue;

                rc = sqlite3_execute(db_conn, "COMMIT", sync_thread, 
th->th_num);
                if (rc == -1) {
                        rollback = 1;
                        continue;
                } else if (rc == 0)
                        break;

        } while (rollback);

        sqlite3_close(db_conn);

        if (rc == 0) {
                printf("%d => thread failed ...\n", th->th_num);
                th->state = 3;
        } else {
                th->state = 2;
                printf("%d => finished.\n", th->th_num);
        }
        return(NULL);
}

int main()
{
        sqlite3 *db_conn = NULL;
        int rc, i;
        int retval = 0;
        struct stat myst;
        int trigger = 0;
        pthread_t thread;

        /* If the database already exists, lets go ahead and delete it */
        if (stat(SQLITEDB, &myst) != -1) {
                unlink(SQLITEDB);
        }

        rc = sqlite3_open(SQLITEDB , &db_conn);
        if (rc) {
                printf("sqlite3_open() failed: %s\n", sqlite3_errmsg(db_conn));
                sqlite3_close(db_conn);
                return(1);
        }

        /* Pre-create a table */
        printf("Creating a table\n");
        rc = sqlite3_execute(db_conn, "CREATE TABLE test_table(threadnum INT, 
cnt INT, testcol TEXT)", NULL, 0);
        if (rc != 1) {
                printf("failed to create table\n");
                sqlite3_close(db_conn);
                return(1);
        }

        sqlite3_close(db_conn);

        pthread_mutex_init(&global_mutex, NULL);
        pthread_mutex_lock(&global_mutex);

        /* Start all threads */
        th_data = malloc(sizeof(*th_data)*NUM_THREADS);
        for (i=0; i<NUM_THREADS; i++) {
                th_data[i].th_num = i;
                th_data[i].state = 0;
                pthread_create(&thread, NULL, testthread, &(th_data[i]));
                pthread_detach(thread);
        }
                
        /* Wait for all threads to start, then release mutex */
        trigger = 0;
        while (!trigger) {
                trigger = 1;
                for (i=0; i<NUM_THREADS; i++)
                        if (!th_data[i].state)
                                trigger = 0;

                usleep(50000);
        }
        printf("all threads started\n");
        pthread_mutex_unlock(&global_mutex);

        /* Wait for all threads to finish, then cleanup and exit */
        trigger = 0;
        while (!trigger) {
                trigger = 1;
                for (i=0; i<NUM_THREADS; i++)
                        if (th_data[i].state < 2) {
                                trigger = 0;
                        } else if (th_data[i].state > 2) {
                                retval = 1;
                        }

                usleep(50000);
        }

        pthread_mutex_destroy(&global_mutex);
        free(th_data);
        printf("exiting...(test %s)\n", (retval==0)?"succeeded":"failed");
        return(retval);
}


-----------------------------------------------------------------------------
To unsubscribe, send email to [EMAIL PROTECTED]
-----------------------------------------------------------------------------

Reply via email to