Hi

Please see attached file: I added method testConcurrentMergeDeadlock() to 
existing test.
This test failed with error
...
Caused by: java.lang.IllegalStateException: Entry is locked [1.4.193/101]
at org.h2.mvstore.DataUtils.newIllegalStateException(DataUtils.java:766)


The same test with UPDATE instead of MERGE works fine.
 
Looks like MERGE removes lock previously acquired by SELECT FOR UPDATE


-- 
You received this message because you are subscribed to the Google Groups "H2 
Database" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to h2-database+unsubscr...@googlegroups.com.
To post to this group, send email to h2-database@googlegroups.com.
Visit this group at https://groups.google.com/group/h2-database.
For more options, visit https://groups.google.com/d/optout.
/*
 * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
 * and the EPL 1.0 (http://h2database.com/html/license.html).
 * Initial Developer: H2 Group
 */
package org.h2.test.mvcc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.CountDownLatch;

import org.h2.api.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;

/**
 * Multi-threaded MVCC (multi version concurrency) test cases.
 */
public class TestMvccMultiThreaded extends TestBase {

    /**
     * Run just this test.
     *
     * @param a ignored
     */
    public static void main(String... a) throws Exception {
        TestBase.createCaller().init().test();
    }

    @Override
    public void test() throws Exception {
        testMergeWithUniqueKeyViolation();
        // not supported currently
        if (!config.multiThreaded) {
            testConcurrentMerge();
            testConcurrentUpdate();
            testConcurrentMergeDeadlock();
        }
    }

    private void testMergeWithUniqueKeyViolation() throws Exception {
        deleteDb(getTestName());
        Connection conn = getConnection(getTestName());
        Statement stat = conn.createStatement();
        stat.execute("create table test(x int primary key, y int unique)");
        stat.execute("insert into test values(1, 1)");
        assertThrows(ErrorCode.DUPLICATE_KEY_1, stat).
                execute("merge into test values(2, 1)");
        stat.execute("merge into test values(1, 2)");
        conn.close();

    }

    private void testConcurrentMerge() throws Exception {
        deleteDb(getTestName());
        int len = 3;
        final Connection[] connList = new Connection[len];
        for (int i = 0; i < len; i++) {
            Connection conn = getConnection(
                    getTestName() + ";MVCC=TRUE;LOCK_TIMEOUT=500");
            connList[i] = conn;
        }
        Connection conn = connList[0];
        conn.createStatement().execute(
                "create table test(id int primary key, name varchar)");
        Task[] tasks = new Task[len];
        final boolean[] stop = { false };
        for (int i = 0; i < len; i++) {
            final Connection c = connList[i];
            c.setAutoCommit(false);
            tasks[i] = new Task() {
                @Override
                public void call() throws Exception {
                    while (!stop) {
                        c.createStatement().execute(
                                "merge into test values(1, 'x')");
                        c.commit();
                        Thread.sleep(1);
                    }
                }
            };
            tasks[i].execute();
        }
        Thread.sleep(1000);
        stop[0] = true;
        for (int i = 0; i < len; i++) {
            tasks[i].get();
        }
        for (int i = 0; i < len; i++) {
            connList[i].close();
        }
        deleteDb(getTestName());
    }

    private void testConcurrentUpdate() throws Exception {
        deleteDb(getTestName());
        int len = 2;
        final Connection[] connList = new Connection[len];
        for (int i = 0; i < len; i++) {
            connList[i] = getConnection(
                    getTestName() + ";MVCC=TRUE");
        }
        Connection conn = connList[0];
        conn.createStatement().execute(
                "create table test(id int primary key, value int)");
        conn.createStatement().execute(
                "insert into test values(0, 0)");
        final int count = 1000;
        Task[] tasks = new Task[len];

        final CountDownLatch latch = new CountDownLatch(len);

        for (int i = 0; i < len; i++) {
            final int x = i;
            tasks[i] = new Task() {
                @Override
                public void call() throws Exception {
                    for (int a = 0; a < count; a++) {
                        connList[x].createStatement().execute(
                                "update test set value=value+1");
                        latch.countDown();
                        latch.await();
                    }
                }
            };
            tasks[i].execute();
        }
        for (int i = 0; i < len; i++) {
            tasks[i].get();
        }
        ResultSet rs = conn.createStatement().executeQuery("select value from test");
        rs.next();
        assertEquals(count * len, rs.getInt(1));
        for (int i = 0; i < len; i++) {
            connList[i].close();
        }
    }

    private void testConcurrentMergeDeadlock() throws Exception {
        deleteDb(getTestName());
        int len = 5;
        final Connection[] connList = new Connection[len];
        for (int i = 0; i < len; i++) {
            Connection conn = getConnection(
                    getTestName() + ";MVCC=TRUE;LOCK_TIMEOUT=500");
            conn.setAutoCommit(false);
            connList[i] = conn;
        }
        Connection conn = connList[0];
        conn.createStatement().execute(
                "create table test1(id int primary key, name varchar)");
        conn.createStatement().execute(
                "create table test2(id int primary key, name varchar)");
        Task[] tasks = new Task[len];
        final boolean[] stop = { false };
        for (int i = 0; i < len; i++) {
            final Connection c = connList[i];
            c.setAutoCommit(false);
            tasks[i] = new Task() {
                @Override
                public void call() throws Exception {
                    while (!stop) {
                        String t1,t2;
                        if (Math.random() < 0.5) {
                            t1="1"; t2="2";
                        } else {
                            t2="1"; t1="2";
                        } 
                        c.createStatement().execute(
                                "select id from test1 where id=1 for update");
                        c.createStatement().execute(
                                "select id from test2 where id=1 for update");
                        c.createStatement().execute("merge into test"+t1+" values(1, 'x')");
                        c.createStatement().execute("merge into test"+t2+" values(1, 'x')");
                        Thread.sleep(5);
                        c.commit();
                        Thread.sleep(5);
                    }
                }
            };
            tasks[i].execute();
        }
        Thread.sleep(10000);
        stop[0] = true;
        for (int i = 0; i < len; i++) {
            tasks[i].get();
        }
        for (int i = 0; i < len; i++) {
            connList[i].close();
        }
        deleteDb(getTestName());
    }
}

Reply via email to