http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java new file mode 100644 index 0000000..61c8aa0 --- /dev/null +++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.queue.ExcerptTailer; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.fqltool.commands.Compare; +import org.apache.cassandra.fqltool.commands.Replay; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.tools.Util; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class FQLReplayTest +{ + public FQLReplayTest() + { + Util.initDatabaseDescriptor(); + } + + @Test + public void testOrderedReplay() throws IOException + { + File f = generateQueries(100, true); + int queryCount = 0; + try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build(); + FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101)) + { + long last = -1; + while (iter.hasNext()) + { + FQLQuery q = iter.next(); + assertTrue(q.queryStartTime >= last); + last = q.queryStartTime; + queryCount++; + } + } + assertEquals(100, queryCount); + } + + @Test + public void testQueryIterator() throws IOException + { + File f = generateQueries(100, false); + int queryCount = 0; + try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build(); + FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 1)) + { + long last = -1; + while (iter.hasNext()) + { + FQLQuery q = iter.next(); + assertTrue(q.queryStartTime >= last); + last = q.queryStartTime; + queryCount++; + } + } + assertEquals(100, queryCount); + } + + @Test + public void testMergingIterator() throws IOException + { + File f = generateQueries(100, false); + File f2 = generateQueries(100, false); + int queryCount = 0; + try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build(); + ChronicleQueue queue2 = ChronicleQueueBuilder.single(f2).build(); + FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101); + FQLQueryIterator iter2 = new FQLQueryIterator(queue2.createTailer(), 101); + MergeIterator<FQLQuery, List<FQLQuery>> merger = MergeIterator.get(Lists.newArrayList(iter, iter2), FQLQuery::compareTo, new Replay.Reducer())) + { + long last = -1; + + while (merger.hasNext()) + { + List<FQLQuery> qs = merger.next(); + assertEquals(2, qs.size()); + assertEquals(0, qs.get(0).compareTo(qs.get(1))); + assertTrue(qs.get(0).queryStartTime >= last); + last = qs.get(0).queryStartTime; + queryCount++; + } + } + assertEquals(100, queryCount); + } + + @Test + public void testFQLQueryReader() throws IOException + { + FQLQueryReader reader = new FQLQueryReader(); + + try (ChronicleQueue queue = ChronicleQueueBuilder.single(generateQueries(1000, true)).build()) + { + ExcerptTailer tailer = queue.createTailer(); + int queryCount = 0; + while (tailer.readDocument(reader)) + { + assertNotNull(reader.getQuery()); + if (reader.getQuery() instanceof FQLQuery.Single) + { + assertTrue(reader.getQuery().keyspace() == null || reader.getQuery().keyspace().equals("querykeyspace")); + } + else + { + assertEquals("someks", reader.getQuery().keyspace()); + } + queryCount++; + } + assertEquals(1000, queryCount); + } + } + + @Test + public void testStoringResults() throws Throwable + { + File tmpDir = Files.createTempDirectory("results").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + + ResultHandler.ComparableResultSet res = createResultSet(10, 10, true); + ResultStore rs = new ResultStore(Collections.singletonList(tmpDir), queryDir); + FQLQuery query = new FQLQuery.Single("abc", QueryOptions.DEFAULT.getProtocolVersion().asInt(), QueryOptions.DEFAULT, 12345, 11111, 22, "select * from abc", Collections.emptyList()); + try + { + rs.storeColumnDefinitions(query, Collections.singletonList(res.getColumnDefinitions())); + Iterator<ResultHandler.ComparableRow> it = res.iterator(); + while (it.hasNext()) + { + List<ResultHandler.ComparableRow> row = Collections.singletonList(it.next()); + rs.storeRows(row); + } + // this marks the end of the result set: + rs.storeRows(Collections.singletonList(null)); + } + finally + { + rs.close(); + } + + compareResults(Collections.singletonList(Pair.create(query, res)), + readResultFile(tmpDir, queryDir)); + + } + + private static void compareResults(List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> expected, + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> other) + { + ResultComparator comparator = new ResultComparator(); + assertEquals(expected.size(), other.size()); + for (int i = 0; i < expected.size(); i++) + { + assertEquals(expected.get(i).left, other.get(i).left); + ResultHandler.ComparableResultSet expectedResultSet = expected.get(i).right; + ResultHandler.ComparableResultSet otherResultSet = other.get(i).right; + List<String> hosts = Lists.newArrayList("a", "b"); + comparator.compareColumnDefinitions(hosts, + expected.get(i).left, + Lists.newArrayList(expectedResultSet.getColumnDefinitions(), + otherResultSet.getColumnDefinitions())); + Iterator<ResultHandler.ComparableRow> expectedRowIter = expectedResultSet.iterator(); + Iterator<ResultHandler.ComparableRow> otherRowIter = otherResultSet.iterator(); + + + while(expectedRowIter.hasNext() && otherRowIter.hasNext()) + { + ResultHandler.ComparableRow expectedRow = expectedRowIter.next(); + ResultHandler.ComparableRow otherRow = otherRowIter.next(); + assertTrue(comparator.compareRows(hosts, expected.get(i).left, Lists.newArrayList(expectedRow, otherRow))); + } + assertFalse(expectedRowIter.hasNext()); + assertFalse(otherRowIter.hasNext()); + } + } + + @Test + public void testCompareColumnDefinitions() + { + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultComparator rc = new ResultComparator(); + + List<ResultHandler.ComparableColumnDefinitions> colDefs = new ArrayList<>(100); + List<String> targetHosts = new ArrayList<>(100); + for (int i = 0; i < 100; i++) + { + targetHosts.add("host"+i); + colDefs.add(res.getColumnDefinitions()); + } + assertTrue(rc.compareColumnDefinitions(targetHosts, null, colDefs)); + colDefs.set(50, createResultSet(9, 9, false).getColumnDefinitions()); + assertFalse(rc.compareColumnDefinitions(targetHosts, null, colDefs)); + } + + @Test + public void testCompareEqualRows() + { + ResultComparator rc = new ResultComparator(); + + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); + List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2); + List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); + + while (true) + { + List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); + assertTrue(rc.compareRows(Lists.newArrayList("eq1", "eq2"), null, rows)); + if (rows.stream().allMatch(Objects::isNull)) + break; + } + } + + @Test + public void testCompareRowsDifferentCount() + { + ResultComparator rc = new ResultComparator(); + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); + List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 11, false)); + List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); + boolean foundMismatch = false; + while (true) + { + List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); + if (rows.stream().allMatch(Objects::isNull)) + break; + if (!rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows)) + { + foundMismatch = true; + } + } + assertTrue(foundMismatch); + } + + @Test + public void testCompareRowsDifferentContent() + { + ResultComparator rc = new ResultComparator(); + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); + List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 10, true)); + List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); + while (true) + { + List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); + if (rows.stream().allMatch(Objects::isNull)) + break; + assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows)); + } + } + + @Test + public void testCompareRowsDifferentColumnCount() + { + ResultComparator rc = new ResultComparator(); + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); + List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(11, 10, false)); + List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); + while (true) + { + List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); + if (rows.stream().allMatch(Objects::isNull)) + break; + assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows)); + } + } + + @Test + public void testResultHandler() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = new ArrayList<>(); + targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); + + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false); + List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3); + FQLQuery query = new FQLQuery.Single("abcabc", QueryOptions.DEFAULT.getProtocolVersion().asInt(), QueryOptions.DEFAULT, 1111, 2222, 3333, "select * from xyz", Collections.emptyList()); + try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir)) + { + rh.handleResults(query, toCompare); + } + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir); + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir); + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir); + compareResults(results1, results2); + compareResults(results1, results3); + compareResults(results3, Collections.singletonList(Pair.create(query, res))); + } + + @Test + public void testResultHandlerWithDifference() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = new ArrayList<>(); + targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); + + ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); + ResultHandler.ComparableResultSet res2 = createResultSet(10, 5, false); + ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false); + List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3); + FQLQuery query = new FQLQuery.Single("aaa", QueryOptions.DEFAULT.getProtocolVersion().asInt(), QueryOptions.DEFAULT, 123123, 11111, 22222, "select * from abcabc", Collections.emptyList()); + try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir)) + { + rh.handleResults(query, toCompare); + } + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir); + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir); + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir); + compareResults(results1, results3); + compareResults(results2, Collections.singletonList(Pair.create(query, res2))); + } + + @Test + public void testResultHandlerMultipleResultSets() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = new ArrayList<>(); + targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); + List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>(); + Random random = new Random(); + for (int i = 0; i < 10; i++) + { + List<ResultHandler.ComparableResultSet> results = new ArrayList<>(); + List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50)); + for (int jj = 0; jj < targetHosts.size(); jj++) + { + results.add(createResultSet(5, 1 + random.nextInt(10), true)); + } + FQLQuery q = i % 2 == 0 + ? new FQLQuery.Single("abc"+i, + 3, + QueryOptions.forInternalCalls(values), + i * 1000, + 12345, + 54321, + "select * from xyz where id = "+i, + values) + : new FQLQuery.Batch("abc"+i, + 3, + QueryOptions.forInternalCalls(values), + i * 1000, + i * 54321, + i * 12345, + com.datastax.driver.core.BatchStatement.Type.UNLOGGED, + Lists.newArrayList("select * from aaaa"), + Collections.singletonList(values)); + + resultSets.add(Pair.create(q, results)); + } + try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir)) + { + for (int i = 0; i < resultSets.size(); i++) + rh.handleResults(resultSets.get(i).left, resultSets.get(i).right); + } + + for (int i = 0; i < targetHosts.size(); i++) + compareWithFile(resultPaths, queryDir, resultSets, i); + } + + @Test + public void testResultHandlerFailedQuery() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc", "hostd"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = new ArrayList<>(); + targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); + + List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>(); + Random random = new Random(); + for (int i = 0; i < 10; i++) + { + List<ResultHandler.ComparableResultSet> results = new ArrayList<>(); + List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50)); + for (int jj = 0; jj < targetHosts.size(); jj++) + { + results.add(createResultSet(5, 1 + random.nextInt(10), true)); + } + results.set(0, StoredResultSet.failed("testing abc")); + results.set(3, StoredResultSet.failed("testing abc")); + FQLQuery q = new FQLQuery.Single("abc"+i, + 3, + QueryOptions.forInternalCalls(values), + i * 1000, + i * 12345, + i * 54321, + "select * from xyz where id = "+i, + values); + resultSets.add(Pair.create(q, results)); + } + try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir)) + { + for (int i = 0; i < resultSets.size(); i++) + rh.handleResults(resultSets.get(i).left, resultSets.get(i).right); + } + for (int i = 0; i < targetHosts.size(); i++) + compareWithFile(resultPaths, queryDir, resultSets, i); + } + + @Test + public void testCompare() + { + FQLQuery q1 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList()); + FQLQuery q2 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222,"aaaa", Collections.emptyList()); + + assertEquals(0, q1.compareTo(q2)); + assertEquals(0, q2.compareTo(q1)); + + FQLQuery q3 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList()); + // single queries before batch queries + assertTrue(q1.compareTo(q3) < 0); + assertTrue(q3.compareTo(q1) > 0); + + // check that smaller query time + FQLQuery q4 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, "aaaa", Collections.emptyList()); + assertTrue(q1.compareTo(q4) < 0); + assertTrue(q4.compareTo(q1) > 0); + + FQLQuery q5 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList()); + assertTrue(q1.compareTo(q5) < 0); + assertTrue(q5.compareTo(q1) > 0); + + FQLQuery q6 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes(10))); + FQLQuery q7 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList()); + assertTrue(q6.compareTo(q7) > 0); + assertTrue(q7.compareTo(q6) < 0); + + FQLQuery q8 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("a"))); + FQLQuery q9 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("b"))); + assertTrue(q8.compareTo(q9) < 0); + assertTrue(q9.compareTo(q8) > 0); + } + + @Test + public void testFQLQuerySingleToStatement() + { + List<ByteBuffer> values = new ArrayList<>(); + for (int i = 0; i < 10; i++) + values.add(ByteBufferUtil.bytes(i)); + FQLQuery.Single single = new FQLQuery.Single("xyz", + QueryOptions.DEFAULT.getProtocolVersion().asInt(), + QueryOptions.forInternalCalls(values), + 1234, + 12345, + 54321, + "select * from aaa", + values); + Statement stmt = single.toStatement(); + assertEquals(stmt.getDefaultTimestamp(), 12345); + assertTrue(stmt instanceof SimpleStatement); + SimpleStatement simpleStmt = (SimpleStatement)stmt; + assertEquals("select * from aaa",simpleStmt.getQueryString(CodecRegistry.DEFAULT_INSTANCE)); + assertArrayEquals(values.toArray(), simpleStmt.getValues(com.datastax.driver.core.ProtocolVersion.fromInt(QueryOptions.DEFAULT.getProtocolVersion().asInt()), CodecRegistry.DEFAULT_INSTANCE)); + } + + + @Test + public void testFQLQueryBatchToStatement() + { + List<List<ByteBuffer>> values = new ArrayList<>(); + List<String> queries = new ArrayList<>(); + for (int bqCount = 0; bqCount < 10; bqCount++) + { + queries.add("select * from asdf where x = ? and y = " + bqCount); + List<ByteBuffer> queryValues = new ArrayList<>(); + for (int i = 0; i < 10; i++) + queryValues.add(ByteBufferUtil.bytes(i + ":" + bqCount)); + values.add(queryValues); + } + + FQLQuery.Batch batch = new FQLQuery.Batch("xyz", + QueryOptions.DEFAULT.getProtocolVersion().asInt(), + QueryOptions.DEFAULT, + 1234, + 12345, + 54321, + com.datastax.driver.core.BatchStatement.Type.UNLOGGED, + queries, + values); + Statement stmt = batch.toStatement(); + assertEquals(stmt.getDefaultTimestamp(), 12345); + assertTrue(stmt instanceof com.datastax.driver.core.BatchStatement); + com.datastax.driver.core.BatchStatement batchStmt = (com.datastax.driver.core.BatchStatement)stmt; + List<Statement> statements = Lists.newArrayList(batchStmt.getStatements()); + List<Statement> fromFQLQueries = batch.queries.stream().map(FQLQuery.Single::toStatement).collect(Collectors.toList()); + assertEquals(statements.size(), fromFQLQueries.size()); + assertEquals(12345, batchStmt.getDefaultTimestamp()); + for (int i = 0; i < statements.size(); i++) + compareStatements(statements.get(i), fromFQLQueries.get(i)); + } + + private void compareStatements(Statement statement1, Statement statement2) + { + assertTrue(statement1 instanceof SimpleStatement && statement2 instanceof SimpleStatement); + SimpleStatement simpleStmt1 = (SimpleStatement)statement1; + SimpleStatement simpleStmt2 = (SimpleStatement)statement2; + assertEquals(simpleStmt1.getQueryString(CodecRegistry.DEFAULT_INSTANCE), simpleStmt2.getQueryString(CodecRegistry.DEFAULT_INSTANCE)); + assertArrayEquals(simpleStmt1.getValues(com.datastax.driver.core.ProtocolVersion.fromInt(QueryOptions.DEFAULT.getProtocolVersion().asInt()), CodecRegistry.DEFAULT_INSTANCE), + simpleStmt2.getValues(com.datastax.driver.core.ProtocolVersion.fromInt(QueryOptions.DEFAULT.getProtocolVersion().asInt()), CodecRegistry.DEFAULT_INSTANCE)); + + } + + private File generateQueries(int count, boolean random) throws IOException + { + Random r = new Random(); + File dir = Files.createTempDirectory("chronicle").toFile(); + try (ChronicleQueue readQueue = ChronicleQueueBuilder.single(dir).build()) + { + ExcerptAppender appender = readQueue.acquireAppender(); + + for (int i = 0; i < count; i++) + { + long timestamp = random ? Math.abs(r.nextLong() % 10000) : i; + if (random ? r.nextBoolean() : i % 2 == 0) + { + String query = "abcdefghijklm " + i; + QueryState qs = r.nextBoolean() ? queryState() : queryState("querykeyspace"); + FullQueryLogger.Query q = new FullQueryLogger.Query(query, QueryOptions.DEFAULT, qs, timestamp); + appender.writeDocument(q); + q.release(); + } + else + { + int batchSize = random ? r.nextInt(99) + 1 : i + 1; + List<String> queries = new ArrayList<>(batchSize); + List<List<ByteBuffer>> values = new ArrayList<>(batchSize); + for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 10); jj++) + { + queries.add("aaaaaa batch "+i+":"+jj); + values.add(Collections.emptyList()); + } + FullQueryLogger.Batch batch = new FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED, + queries, + values, + QueryOptions.DEFAULT, + queryState("someks"), + timestamp); + appender.writeDocument(batch); + batch.release(); + } + } + } + return dir; + } + + private QueryState queryState() + { + return QueryState.forInternalCalls(); + } + + private QueryState queryState(String keyspace) + { + ClientState clientState = ClientState.forInternalCalls(keyspace); + return new QueryState(clientState); + } + + static ResultHandler.ComparableResultSet createResultSet(int columnCount, int rowCount, boolean random) + { + List<Pair<String, String>> columnDefs = new ArrayList<>(columnCount); + Random r = new Random(); + for (int i = 0; i < columnCount; i++) + { + columnDefs.add(Pair.create("a" + i, "int")); + } + ResultHandler.ComparableColumnDefinitions colDefs = new StoredResultSet.StoredComparableColumnDefinitions(columnDefs, false, null); + List<ResultHandler.ComparableRow> rows = new ArrayList<>(); + for (int i = 0; i < rowCount; i++) + { + List<ByteBuffer> row = new ArrayList<>(columnCount); + for (int jj = 0; jj < columnCount; jj++) + row.add(ByteBufferUtil.bytes(i + " col " + jj + (random ? r.nextInt() : ""))); + + rows.add(new StoredResultSet.StoredComparableRow(row, colDefs)); + } + return new StoredResultSet(colDefs, true, false, null, rows::iterator); + } + + private static void compareWithFile(List<File> dirs, File resultDir, List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets, int idx) + { + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(dirs.get(idx), resultDir); + for (int i = 0; i < resultSets.size(); i++) + { + FQLQuery query = resultSets.get(i).left; + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> toCompare = Collections.singletonList(Pair.create(query, resultSets.get(i).right.get(idx))); + compareResults(Collections.singletonList(results1.get(i)), toCompare); + } + } + + private static List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> readResultFile(File dir, File queryDir) + { + List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = new ArrayList<>(); + try (ChronicleQueue q = ChronicleQueueBuilder.single(dir).build(); + ChronicleQueue queryQ = ChronicleQueueBuilder.single(queryDir).build()) + { + ExcerptTailer queryTailer = queryQ.createTailer(); + FQLQueryReader queryReader = new FQLQueryReader(); + Compare.StoredResultSetIterator resultSetIterator = new Compare.StoredResultSetIterator(q.createTailer()); + // we need to materialize the rows in-memory to compare them easier in these tests + while (resultSetIterator.hasNext()) + { + ResultHandler.ComparableResultSet resultSetFromDisk = resultSetIterator.next(); + Iterator<ResultHandler.ComparableRow> rowIterFromDisk = resultSetFromDisk.iterator(); + queryTailer.readDocument(queryReader); + + FQLQuery query = queryReader.getQuery(); + List<ResultHandler.ComparableRow> rows = new ArrayList<>(); + while (rowIterFromDisk.hasNext()) + { + rows.add(rowIterFromDisk.next()); + } + resultSets.add(Pair.create(query, new StoredResultSet(resultSetFromDisk.getColumnDefinitions(), + resultSetIterator.hasNext(), + resultSetFromDisk.wasFailed(), + resultSetFromDisk.getFailureException(), + rows::iterator))); + } + } + return resultSets; + } +}
--------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org