This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 1fbd329 Release session from cache when closing QueryReplayer 1fbd329 is described below commit 1fbd3297a9c8303ca7aa2ff30d182e5ca568de4c Author: yifan-c <yc25c...@gmail.com> AuthorDate: Fri Jan 17 22:13:50 2020 -0800 Release session from cache when closing QueryReplayer Patch by Yifan Cai; reviewed by marcuse for CASSANDRA-15514 --- build.xml | 1 + .../test/QueryReplayerEndToEndTest.java | 92 ++++++++++++++++++++++ .../apache/cassandra/fqltool/QueryReplayer.java | 10 +-- 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/build.xml b/build.xml index 51ba2f5..961af39 100644 --- a/build.xml +++ b/build.xml @@ -1313,6 +1313,7 @@ encoding="utf-8"> <classpath> <path refid="cassandra.classpath"/> + <pathelement location="${fqltool.build.classes}"/> </classpath> <compilerarg value="-XDignore.symbol.file"/> <src path="${test.unit.src}"/> diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java new file mode 100644 index 0000000..9908fcc --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.fqltool.FQLQuery; +import org.apache.cassandra.fqltool.QueryReplayer; + +public class QueryReplayerEndToEndTest extends DistributedTestBase +{ + private final AtomicLong queryStartTimeGenerator = new AtomicLong(1000); + private final AtomicInteger ckGenerator = new AtomicInteger(1); + + @Test + public void testReplayAndCloseMultipleTimes() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, conf -> conf.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP, Feature.NETWORK)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + List<String> hosts = cluster.stream() + .map(i -> i.config().broadcastAddressAndPort().address.getHostAddress()) + .collect(Collectors.toList()); + + final int queriesCount = 3; + // replay for the first time, it should pass + replayAndClose(Collections.singletonList(makeFQLQueries(queriesCount)), hosts); + // replay for the second time, it should pass too + // however, if the cached sessions are not released, the second replay will reused the closed sessions from previous replay and fail to insert + replayAndClose(Collections.singletonList(makeFQLQueries(queriesCount)), hosts); + Object[][] result = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.QUORUM, 1); + Assert.assertEquals(String.format("Expecting to see %d rows since it replayed twice and each with %d queries", queriesCount * 2, queriesCount), + queriesCount * 2, result.length); + } + } + + private void replayAndClose(List<List<FQLQuery>> allFqlQueries, List<String> hosts) throws IOException + { + List<Predicate<FQLQuery>> allowAll = Collections.singletonList(fqlQuery -> true); + try (QueryReplayer queryReplayer = new QueryReplayer(allFqlQueries.iterator(), hosts, null, allowAll, null)) + { + queryReplayer.replay(); + } + } + + // generate a new list of FQLQuery for each invocation + private List<FQLQuery> makeFQLQueries(int n) + { + return IntStream.range(0, n) + .boxed() + .map(i -> new FQLQuery.Single(KEYSPACE, + QueryOptions.DEFAULT.getProtocolVersion().asInt(), + QueryOptions.DEFAULT, queryStartTimeGenerator.incrementAndGet(), + 2222, + 3333, + String.format("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, %d, %d)", KEYSPACE, ckGenerator.incrementAndGet(), i), + Collections.emptyList())) + .collect(Collectors.toList()); + } +} diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java index b17ec53..4524e33 100644 --- a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java @@ -257,18 +257,18 @@ public class QueryReplayer implements Closeable public void close() { - for (Session s : sessionCache.values()) - { - try + sessionCache.entrySet().removeIf(entry -> { + try (Session s = entry.getValue()) { - s.close(); s.getCluster().close(); + return true; } catch (Throwable t) { logger.error("Could not close connection", t); + return false; } - } + }); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org