Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/incubator-omid/pull/46#discussion_r224179421
  
    --- Diff: 
hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java ---
    @@ -0,0 +1,243 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.omid.transaction;
    +
    +
    +import static com.google.common.base.Charsets.UTF_8;
    +import static 
org.apache.omid.committable.hbase.HBaseCommitTableConfig.DEFAULT_COMMIT_TABLE_CF_NAME;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.spy;
    +import static org.testng.Assert.assertFalse;
    +import static org.testng.Assert.assertTrue;
    +import static org.mockito.Matchers.any;
    +
    +import com.google.common.base.Optional;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.omid.committable.CommitTable;
    +import org.apache.omid.committable.hbase.KeyGenerator;
    +import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
    +import org.apache.omid.metrics.NullMetricsProvider;
    +import org.apache.omid.tso.client.OmidClientConfiguration;
    +import org.apache.omid.tso.client.TSOClient;
    +import org.apache.omid.tso.client.TSOFuture;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.testng.ITestContext;
    +import org.testng.annotations.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Set;
    +import java.util.concurrent.CountDownLatch;
    +
    +
    +@Test(groups = "sharedHBase")
    +public class TestOmidLLRaces extends OmidTestBase {
    +
    +    private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
    +    private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
    +    private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
    +    private static final byte[] qualifier = Bytes.toBytes("testdata");
    +    private static final byte[] data1 = Bytes.toBytes("testWrite-1");
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(TestOmidLLRaces.class);
    +    @Override
    +    protected boolean isLowLatency() {
    +        return true;
    +    }
    +
    +    @Test(timeOut = 30_000)
    +    public void testIsCommitted(ITestContext context) throws Exception {
    +        AbstractTransactionManager tm = 
(AbstractTransactionManager)newTransactionManagerHBaseCommitTable(getClient(context));
    +
    +        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
    +        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new 
HTableAccessWrapper(htable, htable),
    +                tm.getCommitTableClient());
    +        TTable table = spy(new TTable(htable, snapshotFilter, false));
    +
    +        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
    +
    +        Put put = new Put(row1);
    +        put.addColumn(family, qualifier, data1);
    +        table.put(t1, put);
    +        tm.commit(t1);
    +
    +        HBaseTransaction t2 = (HBaseTransaction) tm.begin();
    +        put = new Put(row2);
    +        put.addColumn(family, qualifier, data1);
    +        table.put(t2, put);
    +        table.flushCommits();
    +
    +        HBaseTransaction t3 = (HBaseTransaction) tm.begin();
    +        put = new Put(row2);
    +        put.addColumn(family, qualifier, data1);
    +        table.put(t3, put);
    +        tm.commit(t3);
    +
    +        HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, 
qualifier, t1.getStartTimestamp());
    +        HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, 
qualifier, t2.getStartTimestamp());
    +        HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, 
qualifier, t3.getStartTimestamp());
    +
    +        assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), 
"row1 should be committed");
    +        assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), 
"row2 should not be committed for kv2");
    +        assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), 
"row2 should be committed for kv3");
    +        assertTrue(tm.isLowLatency());
    +    }
    +
    +
    +    @Test(timeOut = 30_000)
    +    public void testInvalidation(ITestContext context) throws Exception {
    +        AbstractTransactionManager tm = 
(AbstractTransactionManager)newTransactionManagerHBaseCommitTable(getClient(context));
    +
    +        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
    +        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new 
HTableAccessWrapper(htable, htable),
    +                tm.getCommitTableClient());
    +        TTable table = spy(new TTable(htable, snapshotFilter, false));
    +
    +        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
    +        Put put = new Put(row1);
    +        put.addColumn(family, qualifier, data1);
    +        table.put(t1, put);
    +
    +        HBaseTransaction t2 = (HBaseTransaction) tm.begin();
    +        Get get = new Get(row1);
    +        get.addColumn(family, qualifier);
    +        table.get(t2,get);
    +
    +        //assert there is an invalidation marker:
    +        Table commitTable = 
connection.getTable(TableName.valueOf("OMID_COMMIT_TABLE"));
    +        KeyGenerator keygen = 
KeyGeneratorImplementations.defaultKeyGenerator();
    +        byte[] row = keygen.startTimestampToKey(t1.getStartTimestamp());
    +        Get getInvalidation = new Get(row);
    +        
getInvalidation.addColumn(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME),"IT".getBytes(UTF_8));
    +        Result res = commitTable.get(getInvalidation);
    +        int val = 
Bytes.toInt(res.getValue(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME), 
"IT".getBytes(UTF_8)));
    +        assertTrue(val == 1);
    +
    +        boolean gotInvalidated = false;
    +        try {
    +            tm.commit(t1);
    +        } catch (RollbackException e) {
    +            gotInvalidated = true;
    +        }
    +        assertTrue(gotInvalidated);
    +        tm.commit(t2);
    +        Thread.sleep(1000);
    +        res = commitTable.get(getInvalidation);
    +        assertTrue(res.isEmpty());
    +        assertTrue(tm.isLowLatency());
    +    }
    +
    +//    @Test(timeOut = 30_000000)
    +//    public void testReadRace(ITestContext context) throws Exception {
    +//
    +//        final CountDownLatch waitForCommit = new CountDownLatch(1);
    +//        final CountDownLatch latch1 = new CountDownLatch(1);
    +//        final CountDownLatch latch2 = new CountDownLatch(1);
    +//        final Integer[] shadowCellReads = {0};
    +//
    +//        AbstractTransactionManager tm = 
(AbstractTransactionManager)newTransactionManager(context);
    +//        Table htable = 
connection.getTable(TableName.valueOf(TEST_TABLE));
    +//        SnapshotFilterImpl snapshotFilter = spy(new 
SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
    +//                tm.getCommitTableClient()));
    +//        TTable table = spy(new TTable(htable, snapshotFilter, false));
    +//
    +//        doAnswer(new Answer<Optional<CommitTable.CommitTimestamp>>() {
    +//            @Override
    +//            public Optional<CommitTable.CommitTimestamp> 
answer(InvocationOnMock invocation) throws Throwable {
    +//                shadowCellReads[0]++;
    +//                if (shadowCellReads[0] == 1) {
    +//                    Optional<CommitTable.CommitTimestamp> result = 
(Optional<CommitTable.CommitTimestamp>) invocation.callRealMethod();
    +//                    latch1.countDown();
    +//                    latch2.await();
    +//                    return result;
    +//                } else {
    +//                    Optional<CommitTable.CommitTimestamp> result = 
(Optional<CommitTable.CommitTimestamp>) invocation.callRealMethod();
    +//                    latch1.countDown();
    +//                    latch2.await();
    +//                    return result;
    +//                }
    +//            }
    +//        
}).when(snapshotFilter).readCommitTimestampFromShadowCell(any(long.class), 
any(CommitTimestampLocator.class));
    +//
    +//        doAnswer(new Answer<TSOFuture<Long>>() {
    +//            @Override
    +//            public TSOFuture<Long> answer(InvocationOnMock 
invocationOnMock) throws Throwable {
    +//
    +//                TSOFuture<Long> res = 
(TSOFuture<Long>)invocationOnMock.callRealMethod();
    +//
    +//                LOG.info("writer thread commit ts {}",1);
    +//                waitForCommit.countDown();
    +//                return res;
    +//            }
    +//        }).when(client).commit(any(long.class), any(Set.class));
    +//
    +//        Thread readThread = new Thread("Read Thread") {
    +//            @Override
    +//            public void run() {
    +//                try {
    +//                    waitForCommit.await();
    +//                    HBaseTransaction t2 = (HBaseTransaction) tm.begin();
    +//                    LOG.info("reader thread ts {}", 
t2.getStartTimestamp());
    +//                    Get get = new Get(row1);
    +//                    get.addColumn(family, qualifier);
    +//                    table.get(t2,get);
    +//                    tm.commit(t2);
    +//                } catch (IOException e) {
    +//                    e.printStackTrace();
    +//                } catch (RollbackException e) {
    +//                    e.printStackTrace();
    +//                } catch (InterruptedException e) {
    +//                    e.printStackTrace();
    +//                }
    +//            }
    +//        };
    +//
    +//        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
    +//        Put put = new Put(row1);
    +//        put.addColumn(family, qualifier, data1);
    +//        table.put(t1, put);
    +//
    +//        readThread.start();
    +//
    +//        //latch1.await();
    +//
    +//
    +//
    +//        boolean gotInvalidated = false;
    +//        try {
    +//            tm.commit(t1);
    +//        } catch (RollbackException e) {
    +//            gotInvalidated = true;
    +//        }
    +//
    +//        latch2.countDown();
    +//        assertFalse(gotInvalidated);
    +//
    +//
    +//        assertTrue(tm.isLowLatency());
    +//    }
    --- End diff --
    
    Minor nit: remove commented out code (or comment back in)


---

Reply via email to