This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch session_pool in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 19a78a105b8e804fdc22785bdf6efffce0d75f8c Author: qiaojialin <[email protected]> AuthorDate: Sat Mar 14 10:18:40 2020 +0800 add session pool --- .../UserGuide/4-Client/3-Programming - Session.md | 18 +- .../UserGuide/4-Client/3-Programming - Session.md | 24 +- .../java/org/apache/iotdb/session/Session.java | 14 +- .../org/apache/iotdb/session/SessionDataSet.java | 11 +- .../iotdb/session/pool/SessionDataSetWrapper.java | 67 ++++ .../org/apache/iotdb/session/pool/SessionPool.java | 437 +++++++++++++++++++++ .../apache/iotdb/session/pool/SessionPoolTest.java | 295 ++++++++++++++ 7 files changed, 860 insertions(+), 6 deletions(-) diff --git a/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md b/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md index 1db8623..d7aeb1d 100644 --- a/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md +++ b/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md @@ -103,4 +103,20 @@ 浏览上述接口的详细信息,请参阅代码session/src/main/java/org/apache/iotdb/session/Session.java - 使用上述接口的示例代码在example/session/src/main/java/org/apache/iotdb/SessionExample.java,在此文件中包含了开启session和执行批量插入等操作 \ No newline at end of file + 使用上述接口的示例代码在example/session/src/main/java/org/apache/iotdb/SessionExample.java,在此文件中包含了开启session和执行批量插入等操作 + + # 针对原生接口的连接池 + + 我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 + 如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 + + 当一个连接被用完后,他会自动返回池中等待下次被使用; + 当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。 + + 对于查询操作: + + 1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; + 2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; + 3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. + + 使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java``` diff --git a/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md b/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md index cf446ef..18cd09b 100644 --- a/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md +++ b/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md @@ -113,4 +113,26 @@ Here we show the commonly used interfaces and their parameters in the Session: To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java -The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion. \ No newline at end of file +The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion. + + +# Session Pool for Native API + +We provided a connection pool (`SessionPool) for Native API. +Using the interface, you need to define the pool size. + +If you can not get a session connection in 60 secondes, there is a warning log but the program will hang. + +If a session has finished an operation, it will be put back to the pool automatically. +If a session connection is broken, the session will be removed automatically and the pool will try +to create a new session and redo the operation. + +For query operations: + +1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`; +2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it, +you have to call `SessionPool.closeResultSet(wrapper)` manually; +3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then +you have to call `SessionPool.closeResultSet(wrapper)` manually; + +Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java``` diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index a3466a2..78791b9 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -59,6 +59,7 @@ public class Session { private ZoneId zoneId; private TSOperationHandle operationHandle; private long statementId; + private int fetchSize; public Session(String host, int port) { @@ -74,6 +75,15 @@ public class Session { this.port = port; this.username = username; this.password = password; + this.fetchSize = 10000; + } + + public Session(String host, int port, String username, String password, int fetchSize) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.fetchSize = fetchSize; } public synchronized void open() throws IoTDBSessionException { @@ -346,8 +356,10 @@ public class Session { RpcUtils.verifySuccess(execResp.getStatus()); operationHandle = execResp.getOperationHandle(); - return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), + SessionDataSet dataSet = new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), operationHandle.getOperationId().getQueryId(), client, operationHandle); + dataSet.setBatchSize(fetchSize); + return dataSet; } /** diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java index fd055d3..5e5ecbe 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java @@ -47,6 +47,7 @@ public class SessionDataSet { private TSOperationHandle operationHandle; private int batchSize = 512; private List<String> columnTypeDeduplicatedList; + private List<String> columnNames; public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, long queryId, TSIService.Iface client, TSOperationHandle operationHandle) { @@ -54,7 +55,7 @@ public class SessionDataSet { this.queryId = queryId; this.client = client; this.operationHandle = operationHandle; - + this.columnNames = columnNameList; // deduplicate columnTypeList according to columnNameList this.columnTypeDeduplicatedList = new ArrayList<>(); Set<String> columnSet = new HashSet<>(); // for deduplication @@ -67,6 +68,10 @@ public class SessionDataSet { } } + public List<String> getColumnNames() { + return columnNames; + } + public int getBatchSize() { return batchSize; } @@ -127,10 +132,10 @@ public class SessionDataSet { RpcUtils.verifySuccess(closeResp); } } catch (IoTDBRPCException e) { - throw new SQLException("Error occurs for close opeation in server side. The reason is " + e); + throw new SQLException("Error occurs for close opeation in server side. The reason is " + e, e); } catch (TException e) { throw new SQLException( - "Error occurs when connecting to server for close operation, because: " + e); + "Error occurs when connecting to server for close operation, because: " + e, e); } } } diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java new file mode 100644 index 0000000..2f3a067 --- /dev/null +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.pool; + +import java.sql.SQLException; +import java.util.List; +import org.apache.iotdb.rpc.IoTDBRPCException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +public class SessionDataSetWrapper { + SessionDataSet sessionDataSet; + Session session; + SessionPool pool; + + public SessionDataSetWrapper(SessionDataSet sessionDataSet, + Session session, SessionPool pool) { + this.sessionDataSet = sessionDataSet; + this.session = session; + this.pool = pool; + } + + protected Session getSession() { + return session; + } + + public int getBatchSize() { + return sessionDataSet.getBatchSize(); + } + + public void setBatchSize(int batchSize) { + sessionDataSet.setBatchSize(batchSize); + } + + public boolean hasNext() throws SQLException, IoTDBRPCException { + boolean next = sessionDataSet.hasNext(); + if (!next) { + pool.closeResultSet(this); + } + return next; + } + + public RowRecord next() throws SQLException, IoTDBRPCException { + return sessionDataSet.next(); + } + + public List<String> getColumnNames() { + return sessionDataSet.getColumnNames(); + } +} \ No newline at end of file diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java new file mode 100644 index 0000000..9918ad9 --- /dev/null +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.pool; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import org.apache.iotdb.rpc.IoTDBRPCException; +import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.iotdb.session.IoTDBSessionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.record.RowBatch; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SessionPool is a wrapper of a Session Set. + * Using SessionPool, the user do not need to consider how to reuse a session connection. + * Even if the session is disconnected, the session pool can recognize it and remove the broken + * session connection and create a new one. + * + * If there is no available connections and the pool reaches its max size, the all methods will hang + * until there is a available connection. + * + * If a user has waited for a session for more than 60 seconds, a warn log will be printed. + * + * The only thing you have to remember is that: + * + * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok. + * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true), + * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually. + * Otherwise the connection is occupied by the query. + * + * Another case that you have to manually call closeResultSet() is that when there is exception + * when you call SessionDataSetWrapper.hasNext() or next() + * + */ +public class SessionPool { + + private static final Logger logger = LoggerFactory.getLogger(SessionPool.class); + private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>(); + //for session whose resultSet is not released. + private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>(); + + private int size = 0; + private int maxSize = 0; + private String ip; + private int port; + private String user; + private String password; + + private int fetchSize; + + private long timeout = 60*1000; //ms + private static int RETRY = 3; + + public SessionPool(String ip, int port, String user, String password, int maxSize) { + this(ip, port, user, password, maxSize, 10000, 60_000); + } + + public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize, long timeout) { + this.maxSize = maxSize; + this.ip = ip; + this.port = port; + this.user = user; + this.password = password; + this.fetchSize = fetchSize; + this.timeout = timeout; + } + + //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect. + //TODO: we can add a mechanism that if the user waits too long time, throw exception. + private Session getSession() throws IoTDBSessionException { + Session session = queue.poll(); + if (session != null) { + return session; + } else { + synchronized (this) { + long start = System.currentTimeMillis(); + while (session == null) { + if (size < maxSize) { + //we can create more session + size++; + //but we do it after skip synchronized block because connection a session is time consuming. + break; + } else { + //we have to wait for someone returns a session. + try { + this.wait(1000); + if (System.currentTimeMillis() - start > 60_000) { + logger.warn( + "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", + (System.currentTimeMillis() - start) / 1000, ip, port, user, password); + if (System.currentTimeMillis() - start > timeout) { + throw new IoTDBSessionException(String.format("timeout to get a connection from %s:%s", ip, port)); + } + } + } catch (InterruptedException e) { + logger.error("the SessionPool is damaged", e); + Thread.currentThread().interrupt(); + } + session = queue.poll(); + } + } + if (session != null) { + return session; + } + } + if (logger.isDebugEnabled()) { + logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password); + } + session = new Session(ip, port, user, password, fetchSize); + session.open(); + return session; + } + } + + public int currentAvailableSize() { + return queue.size(); + } + + public int currentOccupiedSize() { + return occupied.size(); + } + + private void putBack(Session session) { + queue.push(session); + synchronized (this) { + this.notifyAll(); + } + } + + private void occupy(Session session) { + occupied.put(session, session); + } + + /** + * close all connections in the pool + */ + public synchronized void close() { + for (Session session : queue) { + try { + session.close(); + } catch (IoTDBSessionException e) { + //do nothing + } + } + for (Session session : occupied.keySet()) { + try { + session.close(); + } catch (IoTDBSessionException e) { + //do nothing + } + } + queue.clear(); + occupied.clear(); + } + + public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException { + boolean putback = true; + try { + wrapper.sessionDataSet.closeOperationHandle(); + } catch (SQLException e) { + if (e.getCause() instanceof TException) { + // the connection is broken. + removeSession(); + putback = false; + } else { + throw e; + } + } finally { + Session session = occupied.remove(wrapper.session); + if (putback && session != null) { + putBack(wrapper.session); + } + } + } + + private synchronized void removeSession() { + if (logger.isDebugEnabled()) { + logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password); + } + size--; + } + + private void closeSession(Session session) { + if (session != null) { + try { + session.close(); + } catch (Exception e2) { + //do nothing. We just want to guarantee the session is closed. + } + } + } + + /** + * use batch interface to insert data + * + * @param rowBatch data batch + */ + public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch); + putBack(session); + return resp; + } catch (IoTDBSessionException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + + + /** + * insert data in one row, if you want improve your performance, please use insertInBatch method + * or insertBatch method + * + * @see Session#insertBatch(RowBatch) + */ + public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values) + throws IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + TSStatus resp = session.insert(deviceId, time, measurements, values); + putBack(session); + return resp; + } catch (IoTDBSessionException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + /** + * delete a timeseries, including data and schema + * + * @param paths timeseries to delete, should be a whole path + */ + public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + TSStatus resp = session.deleteTimeseries(paths); + putBack(session); + return resp; + } catch (IoTDBSessionException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + /** + * delete data <= time in one timeseries + * + * @param path data in which time series to delete + * @param time data with time stamp less than or equal to time will be deleted + */ + public TSStatus deleteData(String path, long time) throws IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + TSStatus resp = session.deleteData(path, time); + putBack(session); + return resp; + } catch (IoTDBSessionException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + TSStatus resp = session.setStorageGroup(storageGroupId); + putBack(session); + return resp; + } catch (IoTDBSessionException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + public TSStatus createTimeseries(String path, TSDataType dataType, TSEncoding encoding, + CompressionType compressor) throws IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + TSStatus resp = session.createTimeseries(path, dataType, encoding, compressor); + putBack(session); + return resp; + } catch (IoTDBSessionException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + /** + * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the + * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any + * more. + * + * @param sql query statement + * @return result set Notice that you must get the result instance. Otherwise a data leakage will happen + */ + public SessionDataSetWrapper executeQueryStatement(String sql) + throws IoTDBRPCException, IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + SessionDataSet resp = session.executeQueryStatement(sql); + SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this); + occupy(session); + return wrapper; + } catch (TException e) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } catch (IoTDBRPCException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } + + /** + * execute non query statement + * + * @param sql non query statement + */ + public void executeNonQueryStatement(String sql) + throws IoTDBRPCException, IoTDBSessionException { + for (int i=0; i< RETRY; i ++){ + Session session = getSession(); + try { + session.executeNonQueryStatement(sql); + putBack(session); + return; + } catch (TException e) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } catch (IoTDBRPCException e) { + if (e.getCause() instanceof TException) { + // TException means the connection is broken, remove it and get a new one. + closeSession(session); + removeSession(); + } else { + putBack(session); + throw e; + } + } + } + throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY)); + } +} diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java new file mode 100644 index 0000000..440bb7b --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.pool; + +import static org.junit.Assert.*; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.rpc.IoTDBRPCException; +import org.apache.iotdb.session.IoTDBSessionException; +import org.apache.iotdb.session.utils.EnvironmentUtils; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +//this test is not for testing the correctness of Session API. So we just implement one of the API. +public class SessionPoolTest { + IoTDB daemon; + @Before + public void setUp() throws Exception { + System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/"); + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true); + daemon = IoTDB.getInstance(); + daemon.active(); + EnvironmentUtils.closeStatMonitor(); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + daemon.stop(); + EnvironmentUtils.cleanEnv(); + } + + + @Test + public void insert() { + SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3); + ExecutorService service = Executors.newFixedThreadPool(10); + try { + pool.setStorageGroup("root.sg1"); + } catch (IoTDBSessionException e) { + fail(); + } + for (int i = 0; i < 10; i++) { + final int no = i; + service.submit(() -> { + try { + pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no), + Collections.singletonList("3")); + } catch (IoTDBSessionException e) { + fail(); + } + }); + } + service.shutdown(); + try { + assertTrue(service.awaitTermination(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + assertTrue(pool.currentAvailableSize() <= 3); + assertEquals(0, pool.currentOccupiedSize()); + pool.close(); + } + + @Test + public void incorrectSQL() { + SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3); + assertEquals(0, pool.currentAvailableSize()); + try { + pool.setStorageGroup("root.sg1"); + } catch (IoTDBSessionException e) { + fail(); + } + try { + pool.insert(".root.sg1.d1", 1, Collections.singletonList("s"), + Collections.singletonList("3")); + } catch (IoTDBSessionException e) { + //do nothing + } + assertEquals(1, pool.currentAvailableSize()); + pool.close(); + } + + + @Test + public void incorrectExecuteQueryStatement() { + SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3); + ExecutorService service = Executors.newFixedThreadPool(10); + try { + pool.setStorageGroup("root.sg1"); + } catch (IoTDBSessionException e) { + fail(); + } + for (int i = 0; i < 10; i++) { + try { + pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), + Collections.singletonList("" + i)); + } catch (IoTDBSessionException e) { + fail(); + } + } + //now let's query + for (int i = 0; i < 10; i++) { + final int no = i; + service.submit(() -> { + try { + SessionDataSetWrapper wrapper = pool + .executeQueryStatement("select * from root.sg1.d1 where time = " + no); + //this is incorrect becasue wrapper is not closed. + //so all other 7 queries will be blocked + } catch (IoTDBSessionException e) { + fail(); + } catch (IoTDBRPCException e) { + e.printStackTrace(); + } + }); + } + service.shutdown(); + try { + assertFalse(service.awaitTermination(3, TimeUnit.SECONDS)); + assertEquals(0, pool.currentAvailableSize()); + assertTrue(pool.currentOccupiedSize() <= 3); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + pool.close(); + } + + @Test + public void executeQueryStatement() { + SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3); + correctQuery(pool); + pool.close(); + } + + private void correctQuery(SessionPool pool) { + ExecutorService service = Executors.newFixedThreadPool(10); + try { + pool.setStorageGroup("root.sg1"); + } catch (IoTDBSessionException e) { + } + for (int i = 0; i < 10; i++) { + try { + pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), + Collections.singletonList("" + i)); + } catch (IoTDBSessionException e) { + fail(); + } + } + //now let's query + for (int i = 0; i < 10; i++) { + final int no = i; + service.submit(() -> { + try { + SessionDataSetWrapper wrapper = pool + .executeQueryStatement("select * from root.sg1.d1 where time = " + no); + pool.closeResultSet(wrapper); + pool.closeResultSet(wrapper); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + }); + } + service.shutdown(); + try { + assertTrue(service.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(pool.currentAvailableSize() <= 3); + assertEquals(0, pool.currentOccupiedSize()); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + +// @Test + //failed because the server can not be closed. + public void tryIfTheServerIsRestart() { + SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000); + try { + pool.setStorageGroup("root.sg1"); + } catch (IoTDBSessionException e) { + fail(); + } + for (int i = 0; i < 10; i++) { + try { + pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), + Collections.singletonList("" + i)); + } catch (IoTDBSessionException e) { + fail(); + } + } + SessionDataSetWrapper wrapper = null; + try { + wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1"); + daemon.stop(); + //user does not know what happens. + while (wrapper.hasNext()) { + wrapper.next(); + } + } catch (IoTDBRPCException e) { + e.printStackTrace(); + fail(); + } catch (IoTDBSessionException e) { + e.printStackTrace(); + fail(); + } catch (SQLException e) { + if (e.getCause() instanceof TException) { + try { + pool.closeResultSet(wrapper); + } catch (SQLException ex) { + ex.printStackTrace(); + fail(); + } + } else { + fail("should be TTransportException but get an exception: " + e.getMessage()); + } + daemon.active(); + correctQuery(pool); + pool.close(); + return; + } + fail("should throw exception but not"); + } + + //@Test + //failed because the server can not be closed. + public void tryIfTheServerIsRestartButDataIsGotten() { + SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000); + try { + pool.setStorageGroup("root.sg1"); + } catch (IoTDBSessionException e) { + fail(); + } + for (int i = 0; i < 10; i++) { + try { + pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), + Collections.singletonList("" + i)); + } catch (IoTDBSessionException e) { + fail(); + } + } + assertEquals(1, pool.currentAvailableSize()); + SessionDataSetWrapper wrapper = null; + try { + wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1"); + //user does not know what happens. + assertEquals(0, pool.currentAvailableSize()); + assertEquals(1, pool.currentOccupiedSize()); + while (wrapper.hasNext()) { + wrapper.next(); + } + assertEquals(1, pool.currentAvailableSize()); + assertEquals(0, pool.currentOccupiedSize()); + } catch (IoTDBRPCException e) { + e.printStackTrace(); + fail(); + } catch (IoTDBSessionException e) { + e.printStackTrace(); + fail(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + pool.close(); + } + +} \ No newline at end of file
