This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fast_write_test_with_guoneng in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1fa9fd5b8d68d1543890ed6a40c23083ca076e3e Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Apr 24 15:12:19 2023 +0800 keep guoneng test example --- .../iotdb/AlignedTimeseriesSessionExample.java | 634 --------------- .../org/apache/iotdb/DataMigrationExample.java | 186 ----- .../java/org/apache/iotdb/FastInsertExample.java | 93 --- .../iotdb/HybridTimeseriesSessionExample.java | 122 --- .../src/main/java/org/apache/iotdb/ReadTest.java | 269 +++++++ .../org/apache/iotdb/SessionConcurrentExample.java | 188 ----- .../main/java/org/apache/iotdb/SessionExample.java | 879 --------------------- .../java/org/apache/iotdb/SessionPoolExample.java | 148 ---- .../iotdb/SyntaxConventionRelatedExample.java | 147 ---- .../main/java/org/apache/iotdb/TabletExample.java | 194 ----- .../src/main/java/org/apache/iotdb/WriteTest.java | 228 ++++++ .../org/apache/iotdb/WriteTestFixParallel.java | 225 ++++++ 12 files changed, 722 insertions(+), 2591 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java deleted file mode 100644 index eecceb15ca..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java +++ /dev/null @@ -1,634 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb; - -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.isession.template.Template; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.template.InternalNode; -import org.apache.iotdb.session.template.MeasurementNode; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.utils.BitMap; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.io.IOException; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@SuppressWarnings("squid:S106") -public class AlignedTimeseriesSessionExample { - - private static Session session; - private static final String ROOT_SG1_D1 = "root.sg_1.d1"; - private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2"; - private static final String ROOT_SG1_D1_VECTOR3 = "root.sg_1.d1.vector3"; - private static final String ROOT_SG2_D1_VECTOR4 = "root.sg_2.d1.vector4"; - private static final String ROOT_SG2_D1_VECTOR5 = "root.sg_2.d1.vector5"; - private static final String ROOT_SG2_D1_VECTOR6 = "root.sg_2.d1.vector6"; - private static final String ROOT_SG2_D1_VECTOR7 = "root.sg_2.d1.vector7"; - private static final String ROOT_SG2_D1_VECTOR8 = "root.sg_2.d1.vector8"; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException { - session = new Session("127.0.0.1", 6667, "root", "root"); - session.open(false); - - // set session fetchSize - session.setFetchSize(10000); - - // createTemplate(); - createAlignedTimeseries(); - - insertAlignedRecord(); - // insertAlignedRecords(); - // insertAlignedRecordsOfOneDevice(); - - // insertAlignedStringRecord(); - // insertAlignedStringRecords(); - - // insertTabletWithAlignedTimeseriesMethod1(); - // insertTabletWithAlignedTimeseriesMethod2(); - // insertNullableTabletWithAlignedTimeseries(); - // insertTabletsWithAlignedTimeseries(); - session.executeNonQueryStatement("flush"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); - System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); - System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); - - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); - System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - - // selectWithValueFilterTest(); - // selectWithGroupByTest(); - // selectWithLastTest(); - - // selectWithAggregationTest(); - - // selectWithAlignByDeviceTest(); - - session.close(); - } - - private static void selectTest() throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - dataSet = session.executeQueryStatement("select * from root.sg_1.d1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - - private static void selectWithAlignByDeviceTest() - throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = - session.executeQueryStatement("select * from root.sg_1 align by device"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - - private static void selectWithValueFilterTest() - throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = - session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 3 and time < 9"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - dataSet = - session.executeQueryStatement( - "select * from root.sg_1.d1 where time < 8 and s1 > 3 and s2 > 5"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - - private static void selectWithAggregationTest() - throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - dataSet = session.executeQueryStatement("select count(*) from root.sg_1.d1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - dataSet = - session.executeQueryStatement( - "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - - private static void selectWithGroupByTest() - throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = - session.executeQueryStatement( - "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - dataSet = - session.executeQueryStatement( - "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000" - + " GROUP BY ([50, 100), 10ms)"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - - private static void selectWithLastTest() - throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - dataSet = session.executeQueryStatement("select last * from root.sg_1.d1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - - private static void selectWithLastTestWithoutValueFilter() - throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = - session.executeQueryStatement("select last s1 from root.sg_1.d1 where time >= 5"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - - dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 5"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - dataSet.closeOperationHandle(); - - dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 20"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - dataSet.closeOperationHandle(); - } - - private static void createAlignedTimeseries() - throws StatementExecutionException, IoTDBConnectionException { - List<String> measurements = new ArrayList<>(); - for (int i = 1; i <= 2; i++) { - measurements.add("s" + i); - } - List<TSDataType> dataTypes = new ArrayList<>(); - dataTypes.add(TSDataType.INT64); - dataTypes.add(TSDataType.INT32); - List<TSEncoding> encodings = new ArrayList<>(); - List<CompressionType> compressors = new ArrayList<>(); - for (int i = 1; i <= 2; i++) { - encodings.add(TSEncoding.RLE); - compressors.add(CompressionType.SNAPPY); - } - session.createAlignedTimeseries( - ROOT_SG1_D1, measurements, dataTypes, encodings, compressors, null, null, null); - } - - // be sure template is coordinate with tablet - private static void createTemplate() - throws StatementExecutionException, IoTDBConnectionException, IOException { - Template template = new Template("template1"); - InternalNode iNodeVector = new InternalNode("vector", true); - MeasurementNode mNodeS1 = - new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS2 = - new MeasurementNode("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY); - - iNodeVector.addChild(mNodeS1); - iNodeVector.addChild(mNodeS2); - - template.addToTemplate(iNodeVector); - - session.createSchemaTemplate(template); - session.setSchemaTemplate("template1", "root.sg_1"); - } - - /** Method 1 for insert tablet with aligned timeseries */ - private static void insertTabletWithAlignedTimeseriesMethod1() - throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList); - long timestamp = 1; - - for (long row = 1; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - tablet.addValue( - schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong()); - tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt()); - - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertAlignedTablet(tablet); - tablet.reset(); - } - - session.executeNonQueryStatement("flush"); - } - - /** Method 2 for insert tablet with aligned timeseries */ - private static void insertTabletWithAlignedTimeseriesMethod2() - throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); - - Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList); - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - - for (long time = 100; time < 200; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - - long[] sensor1 = (long[]) values[0]; - sensor1[row] = new SecureRandom().nextLong(); - - int[] sensor2 = (int[]) values[1]; - sensor2[row] = new SecureRandom().nextInt(); - - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - } - - session.executeNonQueryStatement("flush"); - } - - private static void insertNullableTabletWithAlignedTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); - - Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList); - - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - // Use the bitMap to mark the null value point - BitMap[] bitMaps = new BitMap[values.length]; - tablet.bitMaps = bitMaps; - - bitMaps[1] = new BitMap(tablet.getMaxRowNumber()); - for (long time = 200; time < 300; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - - long[] sensor1 = (long[]) values[0]; - sensor1[row] = new SecureRandom().nextLong(); - - int[] sensor2 = (int[]) values[1]; - sensor2[row] = new SecureRandom().nextInt(); - - // mark this point as null value - if (time % 5 == 0) { - bitMaps[1].mark(row); - } - - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - bitMaps[1].reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - } - - session.executeNonQueryStatement("flush"); - } - - private static void insertAlignedRecord() - throws IoTDBConnectionException, StatementExecutionException { - // first file we have both sensots' data - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT32); - - for (long time = 0; time < 10; time++) { - List<Object> values = new ArrayList<>(); - values.add(time); - values.add((int) time); - session.insertAlignedRecord(ROOT_SG1_D1, time, measurements, types, values); - } - session.executeNonQueryStatement("flush"); - // second file we only have s1's data - measurements.clear(); - types.clear(); - measurements.add("s1"); - types.add(TSDataType.INT64); - for (long time = 10; time < 20; time++) { - List<Object> values = new ArrayList<>(); - values.add(time); - session.insertAlignedRecord(ROOT_SG1_D1, time, measurements, types, values); - } - } - - private static void insertAlignedStringRecord() - throws IoTDBConnectionException, StatementExecutionException { - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - - for (long time = 0; time < 1; time++) { - List<String> values = new ArrayList<>(); - values.add("3"); - values.add("4"); - session.insertAlignedRecord(ROOT_SG2_D1_VECTOR5, time, measurements, values); - } - } - - private static void insertAlignedRecords() - throws IoTDBConnectionException, StatementExecutionException { - List<String> deviceIds = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); - List<List<TSDataType>> typeList = new ArrayList<>(); - List<Long> times = new ArrayList<>(); - List<List<Object>> valueList = new ArrayList<>(); - - for (long time = 1; time < 5; time++) { - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - - List<TSDataType> types = new ArrayList<>(); - types.add(TSDataType.INT64); - types.add(TSDataType.INT32); - - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2); - - deviceIds.add(ROOT_SG2_D1_VECTOR4); - times.add(time); - measurementsList.add(measurements); - typeList.add(types); - valueList.add(values); - } - session.insertAlignedRecords(deviceIds, times, measurementsList, typeList, valueList); - } - - private static void insertAlignedStringRecords() - throws IoTDBConnectionException, StatementExecutionException { - List<String> deviceIds = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); - List<Long> times = new ArrayList<>(); - List<List<String>> valueList = new ArrayList<>(); - - for (long time = 1; time < 5; time++) { - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - - List<String> values = new ArrayList<>(); - values.add("3"); - values.add("4"); - - deviceIds.add(ROOT_SG2_D1_VECTOR5); - times.add(time); - measurementsList.add(measurements); - valueList.add(values); - } - session.insertAlignedRecords(deviceIds, times, measurementsList, valueList); - } - - private static void insertAlignedRecordsOfOneDevice() - throws IoTDBConnectionException, StatementExecutionException { - List<List<String>> measurementsList = new ArrayList<>(); - List<List<TSDataType>> typeList = new ArrayList<>(); - List<Long> times = new ArrayList<>(); - List<List<Object>> valueList = new ArrayList<>(); - - for (long time = 10; time < 15; time++) { - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - - List<TSDataType> types = new ArrayList<>(); - types.add(TSDataType.INT64); - types.add(TSDataType.INT32); - - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2); - - times.add(time); - measurementsList.add(measurements); - typeList.add(types); - valueList.add(values); - } - session.insertAlignedRecordsOfOneDevice( - ROOT_SG2_D1_VECTOR4, times, measurementsList, typeList, valueList); - } - - private static void insertTabletsWithAlignedTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - List<MeasurementSchema> schemaList1 = new ArrayList<>(); - schemaList1.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList1.add(new MeasurementSchema("s2", TSDataType.INT64)); - - List<MeasurementSchema> schemaList2 = new ArrayList<>(); - schemaList2.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList2.add(new MeasurementSchema("s2", TSDataType.INT64)); - - List<MeasurementSchema> schemaList3 = new ArrayList<>(); - schemaList3.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList3.add(new MeasurementSchema("s2", TSDataType.INT64)); - - Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100); - Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100); - Tablet tablet3 = new Tablet(ROOT_SG2_D1_VECTOR8, schemaList3, 100); - - Map<String, Tablet> tabletMap = new HashMap<>(); - tabletMap.put(ROOT_SG2_D1_VECTOR6, tablet1); - tabletMap.put(ROOT_SG2_D1_VECTOR7, tablet2); - tabletMap.put(ROOT_SG2_D1_VECTOR8, tablet3); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - tablet1.addTimestamp(row1, timestamp); - tablet2.addTimestamp(row2, timestamp); - tablet3.addTimestamp(row3, timestamp); - for (int i = 0; i < 2; i++) { - long value = new SecureRandom().nextLong(); - tablet1.addValue(schemaList1.get(i).getMeasurementId(), row1, value); - tablet2.addValue(schemaList2.get(i).getMeasurementId(), row2, value); - tablet3.addValue(schemaList3.get(i).getMeasurementId(), row3, value); - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertAlignedTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - timestamp++; - } - - if (tablet1.rowSize != 0) { - session.insertAlignedTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - - // Method 2 to add tablet data - long[] timestamps1 = tablet1.timestamps; - Object[] values1 = tablet1.values; - long[] timestamps2 = tablet2.timestamps; - Object[] values2 = tablet2.values; - long[] timestamps3 = tablet3.timestamps; - Object[] values3 = tablet3.values; - - for (long time = 0; time < 100; time++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - timestamps1[row1] = time; - timestamps2[row2] = time; - timestamps3[row3] = time; - for (int i = 0; i < 2; i++) { - long[] sensor1 = (long[]) values1[i]; - sensor1[row1] = i; - long[] sensor2 = (long[]) values2[i]; - sensor2[row2] = i; - long[] sensor3 = (long[]) values3[i]; - sensor3[row3] = i; - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertAlignedTablets(tabletMap, true); - - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - if (tablet1.rowSize != 0) { - session.insertAlignedTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java deleted file mode 100644 index a3b8bf44ff..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb; - -import org.apache.iotdb.isession.SessionDataSet.DataIterator; -import org.apache.iotdb.isession.pool.SessionDataSetWrapper; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.pool.SessionPool; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** - * Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one - * series, the concurrent thread can be configured by concurrency - * - * <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB - * with 6668 port - */ -public class DataMigrationExample { - - // used to read data from the source IoTDB - private static SessionPool readerPool; - // used to write data into the destination IoTDB - private static SessionPool writerPool; - // concurrent thread of loading timeseries data - private static int concurrency = 5; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException, ExecutionException, - InterruptedException { - - ExecutorService executorService = Executors.newFixedThreadPool(2 * concurrency + 1); - - String path = "root"; - - if (args.length != 0) { - path = args[0]; - } - - readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", concurrency); - writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", concurrency); - - SessionDataSetWrapper schemaDataSet = - readerPool.executeQueryStatement("count timeseries " + path); - DataIterator schemaIter = schemaDataSet.iterator(); - int total; - if (schemaIter.next()) { - total = schemaIter.getInt(1); - System.out.println("Total timeseries: " + total); - } else { - System.out.println("Can not get timeseries schema"); - System.exit(1); - } - readerPool.closeResultSet(schemaDataSet); - - schemaDataSet = readerPool.executeQueryStatement("show timeseries " + path); - schemaIter = schemaDataSet.iterator(); - - List<Future> futureList = new ArrayList<>(); - int count = 0; - while (schemaIter.next()) { - count++; - Path currentPath = new Path(schemaIter.getString("Timeseries"), true); - Future future = - executorService.submit( - new LoadThread( - count, currentPath, TSDataType.valueOf(schemaIter.getString("DataType")))); - futureList.add(future); - } - readerPool.closeResultSet(schemaDataSet); - - for (Future future : futureList) { - future.get(); - } - executorService.shutdown(); - - readerPool.close(); - writerPool.close(); - } - - static class LoadThread implements Callable<Void> { - - String device; - String measurement; - Path series; - TSDataType dataType; - Tablet tablet; - int i; - - public LoadThread(int i, Path series, TSDataType dataType) { - this.i = i; - this.device = series.getDevice(); - this.measurement = series.getMeasurement(); - this.dataType = dataType; - this.series = series; - } - - @Override - public Void call() { - - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema(measurement, dataType)); - tablet = new Tablet(device, schemaList, 300000); - SessionDataSetWrapper dataSet = null; - - try { - - dataSet = - readerPool.executeQueryStatement( - String.format("select %s from %s", measurement, device)); - - DataIterator dataIter = dataSet.iterator(); - while (dataIter.next()) { - int row = tablet.rowSize++; - tablet.timestamps[row] = dataIter.getLong(1); - switch (dataType) { - case BOOLEAN: - ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2); - break; - case INT32: - ((int[]) tablet.values[0])[row] = dataIter.getInt(2); - break; - case INT64: - ((long[]) tablet.values[0])[row] = dataIter.getLong(2); - break; - case FLOAT: - ((float[]) tablet.values[0])[row] = dataIter.getFloat(2); - break; - case DOUBLE: - ((double[]) tablet.values[0])[row] = dataIter.getDouble(2); - break; - case TEXT: - ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2)); - break; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - writerPool.insertTablet(tablet, true); - tablet.reset(); - } - } - if (tablet.rowSize != 0) { - writerPool.insertTablet(tablet); - tablet.reset(); - } - - } catch (Exception e) { - System.out.println( - "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage()); - return null; - } finally { - readerPool.closeResultSet(dataSet); - } - - System.out.println("Loading the " + i + "-th timeseries: " + series + " success"); - return null; - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java deleted file mode 100644 index b24dd725f8..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb; - -import org.apache.iotdb.isession.util.Version; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; - -import java.util.ArrayList; -import java.util.List; - -@SuppressWarnings("squid:S106") -public class FastInsertExample { - - private static Session session; - private static Session sessionEnableRedirect; - private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; - private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2"; - private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3"; - private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4"; - private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5"; - private static final String ROOT_SG1_D1 = "root.sg1.d1"; - private static final String LOCAL_HOST = "127.0.0.1"; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException { - session = - new Session.Builder() - .host(LOCAL_HOST) - .port(6667) - .username("root") - .password("root") - .version(Version.V_1_0) - .build(); - session.open(false); - - fastInsertRecords(); - session.close(); - } - - private static void fastInsertRecords() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> deviceIds = new ArrayList<>(); - List<List<Object>> valuesList = new ArrayList<>(); - List<Long> timestamps = new ArrayList<>(); - List<List<TSDataType>> typesList = new ArrayList<>(); - - for (long time = 1000; time < 1500; time++) { - List<Object> values = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - deviceIds.add(deviceId); - valuesList.add(values); - typesList.add(types); - timestamps.add(time); - if (time != 0 && time % 100 == 0) { - session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList); - deviceIds.clear(); - valuesList.clear(); - typesList.clear(); - timestamps.clear(); - } - } - - session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList); - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java deleted file mode 100644 index 86184f677b..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb; - -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.util.ArrayList; -import java.util.List; - -/** - * This example shows how to insert and select Hybrid Timeseries by session Hybrid Timeseries - * includes Aligned Timeseries and Normal Timeseries - */ -public class HybridTimeseriesSessionExample { - - private static Session session; - private static final String ROOT_SG1_ALIGNEDDEVICE = "root.sg_1.aligned_device"; - private static final String ROOT_SG1_D1 = "root.sg_1.d1"; - private static final String ROOT_SG1_D2 = "root.sg_1.d2"; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException { - session = new Session("127.0.0.1", 6667, "root", "root"); - session.open(false); - - // set session fetchSize - session.setFetchSize(10000); - - insertRecord(ROOT_SG1_D2, 0, 100); - insertTabletWithAlignedTimeseriesMethod(0, 100); - insertRecord(ROOT_SG1_D1, 0, 100); - session.executeNonQueryStatement("flush"); - selectTest(); - - session.close(); - } - - private static void selectTest() throws StatementExecutionException, IoTDBConnectionException { - SessionDataSet dataSet = session.executeQueryStatement("select ** from root.sg_1"); - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - - dataSet.closeOperationHandle(); - } - /** Method 1 for insert tablet with aligned timeseries */ - private static void insertTabletWithAlignedTimeseriesMethod(int minTime, int maxTime) - throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); - - Tablet tablet = new Tablet(ROOT_SG1_ALIGNEDDEVICE, schemaList); - long timestamp = minTime; - - for (long row = minTime; row < maxTime; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, row * 10 + 1L); - tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (int) (row * 10 + 2)); - - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertAlignedTablet(tablet); - tablet.reset(); - } - } - - private static void insertRecord(String deviceId, int minTime, int maxTime) - throws IoTDBConnectionException, StatementExecutionException { - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s2"); - measurements.add("s4"); - measurements.add("s5"); - measurements.add("s6"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = minTime; time < maxTime; time++) { - List<Object> values = new ArrayList<>(); - values.add(time * 10 + 3L); - values.add(time * 10 + 4L); - values.add(time * 10 + 5L); - values.add(time * 10 + 6L); - session.insertRecord(deviceId, time, measurements, types, values); - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest.java b/example/session/src/main/java/org/apache/iotdb/ReadTest.java new file mode 100644 index 0000000000..90bed6b8cd --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/ReadTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb; + +import org.apache.iotdb.isession.SessionDataSet.DataIterator; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class ReadTest { + + private static SessionPool sessionPool; + + private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest.class); + + private static int THREAD_NUMBER = 100; + + private static int DEVICE_NUMBER = 20000; + + private static int SENSOR_NUMBER = 500; + + private static int READ_LOOP = 10000000; + + private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L; + + private static List<String> measurements; + + private static List<TSDataType> types; + + private static AtomicInteger totalRowNumber = new AtomicInteger(); + + private static Random r; + + /** Build a custom SessionPool for this example */ + + /** Build a redirect-able SessionPool for this example */ + private static void constructRedirectSessionPool() { + List<String> nodeUrls = new ArrayList<>(); + // nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("192.168.130.16:6667"); + nodeUrls.add("192.168.130.17:6667"); + nodeUrls.add("192.168.130.18:6667"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(500) + .build(); + sessionPool.setFetchSize(10000); + } + + private static class SyncReadSignal { + protected volatile boolean needResetLatch = true; + protected CountDownLatch latch; + protected long totalCost; + protected long currentTimestamp; + protected int count; + protected String queryName; + + protected SyncReadSignal(int count, String queryName) { + this.count = count; + this.queryName = queryName; + } + + protected void syncCountDownBeforeRead() { + if (needResetLatch) { + synchronized (this) { + if (needResetLatch) { + latch = new CountDownLatch(this.count); + needResetLatch = false; + totalCost = 0L; + currentTimestamp = System.nanoTime(); + } + } + } + } + + protected void finishReadAndWait(long cost, int loopIndex) throws InterruptedException { + CountDownLatch currentLatch = latch; + totalCost += cost; + synchronized (this) { + currentLatch.countDown(); + if (currentLatch.getCount() == 0) { + needResetLatch = true; + long totalCost = (System.nanoTime() - currentTimestamp); + LOGGER.info( + String.format( + "[%s][%d] finished with %d thread. AVG COST: %.3fms. TOTAL COST: %.3fms", + this.queryName, + loopIndex, + this.count, + this.totalCost * 1.0 / this.count / 1_000_000, + totalCost * 1.0 / 1_000_000)); + if (totalCost < LOOP_INTERVAL_IN_NS) { + Thread.sleep((LOOP_INTERVAL_IN_NS - totalCost) / 1000_000); + } + } + } + currentLatch.await(); + } + } + + public static void main(String[] args) throws InterruptedException { + // Choose the SessionPool you going to use + constructRedirectSessionPool(); + + r = new Random(); + + // Run last query + SyncReadSignal lastQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Last Value Query"); + Thread[] lastReadThreads = new Thread[THREAD_NUMBER]; + for (int i = 0; i < THREAD_NUMBER; i++) { + lastReadThreads[i] = + new Thread( + new ReaderThread(lastQuerySignal) { + @Override + protected void executeQuery() + throws IoTDBConnectionException, StatementExecutionException { + queryLastValue(); + } + }); + } + for (Thread thread : lastReadThreads) { + thread.start(); + } + + // Run raw query + SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query"); + Thread[] rawReadThreads = new Thread[THREAD_NUMBER]; + for (int i = 0; i < THREAD_NUMBER; i++) { + rawReadThreads[i] = + new Thread( + new ReaderThread(rawQuerySignal) { + @Override + protected void executeQuery() + throws IoTDBConnectionException, StatementExecutionException { + queryRawValue(); + } + }); + } + for (Thread thread : rawReadThreads) { + thread.start(); + } + + // Run avg query + SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min"); + Thread[] avgReadThreads = new Thread[THREAD_NUMBER]; + for (int i = 0; i < THREAD_NUMBER; i++) { + avgReadThreads[i] = + new Thread( + new ReaderThread(avgQuerySignal) { + @Override + protected void executeQuery() + throws IoTDBConnectionException, StatementExecutionException { + queryAvgValueGroupBy5Min(); + } + }); + } + for (Thread thread : avgReadThreads) { + thread.start(); + } + + for (Thread thread : avgReadThreads) { + thread.join(); + } + } + + private abstract static class ReaderThread implements Runnable { + private final SyncReadSignal signal; + + protected ReaderThread(SyncReadSignal signal) { + this.signal = signal; + } + + @Override + public void run() { + for (int i = 0; i < READ_LOOP; i++) { + long cost = 10_000_000L; + signal.syncCountDownBeforeRead(); + try { + long startTime = System.nanoTime(); + executeQuery(); + cost = System.nanoTime() - startTime; + } catch (Throwable t) { + LOGGER.error("error when execute query.", t); + } finally { + try { + signal.finishReadAndWait(cost, i); + } catch (InterruptedException e) { + LOGGER.error("error when finish signal.", e); + } + } + } + } + + protected abstract void executeQuery() + throws IoTDBConnectionException, StatementExecutionException; + } + + private static void queryLastValue() + throws IoTDBConnectionException, StatementExecutionException { + int device = r.nextInt(DEVICE_NUMBER); + String sql = "select last(s_1) from root.test.g_0.d_" + device; + executeQuery(sql); + } + + private static void queryRawValue() throws IoTDBConnectionException, StatementExecutionException { + int device = r.nextInt(DEVICE_NUMBER); + String sql = String.format("select s_1 from root.test.g_0.d_%s limit 1 offset 10", device); + executeQuery(sql); + } + + private static void queryAvgValueGroupBy5Min() + throws IoTDBConnectionException, StatementExecutionException { + int device = r.nextInt(DEVICE_NUMBER); + String sql = + String.format( + "select avg(s_1) from root.test.g_0.d_%s GROUP BY ([now()-1d, now()), 5m)", device); + executeQuery(sql); + } + + private static void executeQuery(String sql) + throws IoTDBConnectionException, StatementExecutionException { + SessionDataSetWrapper wrapper = null; + try { + wrapper = sessionPool.executeQueryStatement(sql); + // get DataIterator like JDBC + DataIterator dataIterator = wrapper.iterator(); + while (dataIterator.next()) { + for (String columnName : wrapper.getColumnNames()) { + dataIterator.getString(columnName); + } + } + } finally { + // remember to close data set finally! + if (wrapper != null) { + sessionPool.closeResultSet(wrapper); + } + } + } +} diff --git a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java b/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java deleted file mode 100644 index 5011272f65..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb; - -import org.apache.iotdb.isession.template.Template; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.template.MeasurementNode; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class SessionConcurrentExample { - - private static final int sgNum = 20; - private static final int deviceNum = 100; - private static final int parallelDegreeForOneSG = 3; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException, IOException { - - Session session = new Session("127.0.0.1", 6667, "root", "root"); - session.open(false); - createTemplate(session); - session.close(); - - CountDownLatch latch = new CountDownLatch(sgNum * parallelDegreeForOneSG); - ExecutorService es = Executors.newFixedThreadPool(sgNum * parallelDegreeForOneSG); - - for (int i = 0; i < sgNum * parallelDegreeForOneSG; i++) { - int currentIndex = i; - es.execute(() -> concurrentOperation(latch, currentIndex)); - } - - es.shutdown(); - - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - private static void concurrentOperation(CountDownLatch latch, int currentIndex) { - - Session session = new Session("127.0.0.1", 6667, "root", "root"); - try { - session.open(false); - } catch (IoTDBConnectionException e) { - e.printStackTrace(); - } - - for (int j = 0; j < deviceNum; j++) { - try { - insertTablet( - session, String.format("root.sg_%d.d_%d", currentIndex / parallelDegreeForOneSG, j)); - } catch (IoTDBConnectionException | StatementExecutionException e) { - e.printStackTrace(); - } - } - - try { - session.close(); - } catch (IoTDBConnectionException e) { - e.printStackTrace(); - } - - latch.countDown(); - } - - private static void createTemplate(Session session) - throws IoTDBConnectionException, StatementExecutionException, IOException { - - Template template = new Template("template1", false); - MeasurementNode mNodeS1 = - new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS2 = - new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS3 = - new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - - template.addToTemplate(mNodeS1); - template.addToTemplate(mNodeS2); - template.addToTemplate(mNodeS3); - - session.createSchemaTemplate(template); - for (int i = 0; i < sgNum; i++) { - session.setSchemaTemplate("template1", "root.sg_" + i); - } - } - - /** - * insert the data of a device. For each timestamp, the number of measurements is the same. - * - * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize - */ - private static void insertTablet(Session session, String deviceId) - throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet = new Tablet(deviceId, schemaList, 100); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - - for (long row = 0; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < 3; s++) { - long value = new Random().nextLong(); - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - - // Method 2 to add tablet data - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java deleted file mode 100644 index 953b7fd1da..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ /dev/null @@ -1,879 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb; - -import org.apache.iotdb.common.rpc.thrift.TAggregationType; -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.isession.SessionDataSet.DataIterator; -import org.apache.iotdb.isession.template.Template; -import org.apache.iotdb.isession.util.Version; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.template.MeasurementNode; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.BitMap; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -@SuppressWarnings("squid:S106") -public class SessionExample { - - private static Session session; - private static Session sessionEnableRedirect; - private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; - private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2"; - private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3"; - private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4"; - private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5"; - private static final String ROOT_SG1_D1 = "root.sg1.d1"; - private static final String LOCAL_HOST = "127.0.0.1"; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException { - session = - new Session.Builder() - .host(LOCAL_HOST) - .port(6667) - .username("root") - .password("root") - .version(Version.V_1_0) - .build(); - session.open(false); - - // set session fetchSize - session.setFetchSize(10000); - - try { - session.createDatabase("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - throw e; - } - } - - // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - insertRecord(); - insertTablet(); - // insertTabletWithNullValues(); - // insertTablets(); - // insertRecords(); - // insertText(); - // selectInto(); - // createAndDropContinuousQueries(); - // nonQuery(); - query(); - // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); - // queryByIterator(); - // deleteData(); - // deleteTimeseries(); - // setTimeout(); - - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); - session.close(); - } - - private static void createAndDropContinuousQueries() - throws StatementExecutionException, IoTDBConnectionException { - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq1 " - + "BEGIN SELECT max_value(s1) INTO temperature_max FROM root.sg1.* " - + "GROUP BY time(10s) END"); - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq2 " - + "BEGIN SELECT count(s2) INTO temperature_cnt FROM root.sg1.* " - + "GROUP BY time(10s), level=1 END"); - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq3 " - + "RESAMPLE EVERY 20s FOR 20s " - + "BEGIN SELECT avg(s3) INTO temperature_avg FROM root.sg1.* " - + "GROUP BY time(10s), level=1 END"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq1"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq2"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq3"); - } - - private static void createTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) { - session.createTimeseries( - ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) { - session.createTimeseries( - ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S3)) { - session.createTimeseries( - ROOT_SG1_D1_S3, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - - // create timeseries with tags and attributes - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S4)) { - Map<String, String> tags = new HashMap<>(); - tags.put("tag1", "v1"); - Map<String, String> attributes = new HashMap<>(); - attributes.put("description", "v1"); - session.createTimeseries( - ROOT_SG1_D1_S4, - TSDataType.INT64, - TSEncoding.RLE, - CompressionType.SNAPPY, - null, - tags, - attributes, - "temperature"); - } - - // create timeseries with SDT property, SDT will take place when flushing - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S5)) { - // COMPDEV is required - // COMPMAXTIME and COMPMINTIME are optional and their unit is ms - Map<String, String> props = new HashMap<>(); - props.put("LOSS", "sdt"); - props.put("COMPDEV", "0.01"); - props.put("COMPMINTIME", "2"); - props.put("COMPMAXTIME", "10"); - session.createTimeseries( - ROOT_SG1_D1_S5, - TSDataType.INT64, - TSEncoding.RLE, - CompressionType.SNAPPY, - props, - null, - null, - null); - } - } - - private static void createMultiTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - if (!session.checkTimeseriesExists("root.sg1.d2.s1") - && !session.checkTimeseriesExists("root.sg1.d2.s2")) { - List<String> paths = new ArrayList<>(); - paths.add("root.sg1.d2.s1"); - paths.add("root.sg1.d2.s2"); - List<TSDataType> tsDataTypes = new ArrayList<>(); - tsDataTypes.add(TSDataType.INT64); - tsDataTypes.add(TSDataType.INT64); - List<TSEncoding> tsEncodings = new ArrayList<>(); - tsEncodings.add(TSEncoding.RLE); - tsEncodings.add(TSEncoding.RLE); - List<CompressionType> compressionTypes = new ArrayList<>(); - compressionTypes.add(CompressionType.SNAPPY); - compressionTypes.add(CompressionType.SNAPPY); - - List<Map<String, String>> tagsList = new ArrayList<>(); - Map<String, String> tags = new HashMap<>(); - tags.put("unit", "kg"); - tagsList.add(tags); - tagsList.add(tags); - - List<Map<String, String>> attributesList = new ArrayList<>(); - Map<String, String> attributes = new HashMap<>(); - attributes.put("minValue", "1"); - attributes.put("maxValue", "100"); - attributesList.add(attributes); - attributesList.add(attributes); - - List<String> alias = new ArrayList<>(); - alias.add("weight1"); - alias.add("weight2"); - - session.createMultiTimeseries( - paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, attributesList, alias); - } - } - - private static void createTemplate() - throws IoTDBConnectionException, StatementExecutionException, IOException { - - Template template = new Template("template1", false); - MeasurementNode mNodeS1 = - new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS2 = - new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS3 = - new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - - template.addToTemplate(mNodeS1); - template.addToTemplate(mNodeS2); - template.addToTemplate(mNodeS3); - - session.createSchemaTemplate(template); - session.setSchemaTemplate("template1", "root.sg1"); - } - - private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - session.insertRecord(deviceId, time, measurements, types, values); - } - } - - private static void insertRecord4Redirect() - throws IoTDBConnectionException, StatementExecutionException { - for (int i = 0; i < 6; i++) { - for (int j = 0; j < 2; j++) { - String deviceId = "root.redirect" + i + ".d" + j; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<TSDataType> types = new ArrayList<>(); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 5; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L + time); - values.add(2L + time); - values.add(3L + time); - session.insertRecord(deviceId, time, measurements, types, values); - } - } - } - } - - private static void insertStrRecord() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - - for (long time = 0; time < 10; time++) { - List<String> values = new ArrayList<>(); - values.add("1"); - values.add("2"); - values.add("3"); - session.insertRecord(deviceId, time, measurements, values); - } - } - - private static void insertRecordInObject() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L); - } - } - - private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<String> deviceIds = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); - List<List<Object>> valuesList = new ArrayList<>(); - List<Long> timestamps = new ArrayList<>(); - List<List<TSDataType>> typesList = new ArrayList<>(); - - for (long time = 0; time < 500; time++) { - List<Object> values = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - deviceIds.add(deviceId); - measurementsList.add(measurements); - valuesList.add(values); - typesList.add(types); - timestamps.add(time); - if (time != 0 && time % 100 == 0) { - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - deviceIds.clear(); - measurementsList.clear(); - valuesList.clear(); - typesList.clear(); - timestamps.clear(); - } - } - - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - } - - /** - * insert the data of a device. For each timestamp, the number of measurements is the same. - * - * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize - */ - private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - - for (long row = 0; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < 3; s++) { - long value = new Random().nextLong(); - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - - // Method 2 to add tablet data - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertTabletWithNullValues() - throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, null, 1, 1 - * 2, 2, null, 2 - * 3, 3, 3, null - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100); - - // Method 1 to add tablet data - tablet.initBitMaps(); - - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < 3; s++) { - long value = new Random().nextLong(); - // mark null value - if (row % 3 == s) { - tablet.bitMaps[s].mark((int) row); - } - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - - // Method 2 to add tablet data - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - BitMap[] bitMaps = new BitMap[schemaList.size()]; - for (int s = 0; s < 3; s++) { - bitMaps[s] = new BitMap(tablet.getMaxRowNumber()); - } - tablet.bitMaps = bitMaps; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - // mark null value - if (row % 3 == i) { - bitMaps[i].mark(row); - } - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet1 = new Tablet(ROOT_SG1_D1, schemaList, 100); - Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100); - Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100); - - Map<String, Tablet> tabletMap = new HashMap<>(); - tabletMap.put(ROOT_SG1_D1, tablet1); - tabletMap.put("root.sg1.d2", tablet2); - tabletMap.put("root.sg1.d3", tablet3); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - tablet1.addTimestamp(row1, timestamp); - tablet2.addTimestamp(row2, timestamp); - tablet3.addTimestamp(row3, timestamp); - for (int i = 0; i < 3; i++) { - long value = new Random().nextLong(); - tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value); - tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value); - tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value); - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - timestamp++; - } - - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - - // Method 2 to add tablet data - long[] timestamps1 = tablet1.timestamps; - Object[] values1 = tablet1.values; - long[] timestamps2 = tablet2.timestamps; - Object[] values2 = tablet2.values; - long[] timestamps3 = tablet3.timestamps; - Object[] values3 = tablet3.values; - - for (long time = 0; time < 100; time++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - timestamps1[row1] = time; - timestamps2[row2] = time; - timestamps3[row3] = time; - for (int i = 0; i < 3; i++) { - long[] sensor1 = (long[]) values1[i]; - sensor1[row1] = i; - long[] sensor2 = (long[]) values2[i]; - sensor2[row2] = i; - long[] sensor3 = (long[]) values3[i]; - sensor3[row3] = i; - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); - - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - /** - * This example shows how to insert data of TSDataType.TEXT. You can use the session interface to - * write data of String type or Binary type. - */ - private static void insertText() throws IoTDBConnectionException, StatementExecutionException { - String device = "root.sg1.text"; - // the first data is String type and the second data is Binary type - List<Object> datas = Arrays.asList("String", new Binary("Binary")); - // insertRecord example - for (int i = 0; i < datas.size(); i++) { - // write data of String type or Binary type - session.insertRecord( - device, - i, - Collections.singletonList("s1"), - Collections.singletonList(TSDataType.TEXT), - datas.get(i)); - } - - // insertTablet example - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT)); - Tablet tablet = new Tablet(device, schemaList, 100); - for (int i = 0; i < datas.size(); i++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, i); - // write data of String type or Binary type - tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, datas.get(i)); - } - session.insertTablet(tablet); - try (SessionDataSet dataSet = session.executeQueryStatement("select s1, s2 from " + device)) { - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void selectInto() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement( - "select s1, s2, s3 into into_s1, into_s2, into_s3 from root.sg1.d1"); - - try (SessionDataSet dataSet = - session.executeQueryStatement("select into_s1, into_s2, into_s3 from root.sg1.d1")) { - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void deleteData() throws IoTDBConnectionException, StatementExecutionException { - String path = ROOT_SG1_D1_S1; - long deleteTime = 99; - session.deleteData(path, deleteTime); - } - - private static void deleteTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - session.deleteTimeseries(paths); - } - - private static void query() throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void query4Redirect() - throws IoTDBConnectionException, StatementExecutionException { - String selectPrefix = "select * from root.redirect"; - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix + i + ".d1 where time >= 1 and time < 10")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix - + i - + ".d1 where time >= 1 and time < 10 and root.redirect" - + i - + ".d1.s1 > 1")) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - } - - private static void queryWithTimeout() - throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = - session.executeQueryStatement("select * from root.sg1.d1", 2000)) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - long startTime = 10L; - long endTime = 200L; - long timeOut = 60000; - - try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime, timeOut)) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3, 60000)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void aggregationQuery() - throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - - List<TAggregationType> aggregations = new ArrayList<>(); - aggregations.add(TAggregationType.COUNT); - aggregations.add(TAggregationType.SUM); - aggregations.add(TAggregationType.MAX_VALUE); - try (SessionDataSet sessionDataSet = session.executeAggregationQuery(paths, aggregations)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void groupByQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - - List<TAggregationType> aggregations = new ArrayList<>(); - aggregations.add(TAggregationType.COUNT); - aggregations.add(TAggregationType.SUM); - aggregations.add(TAggregationType.MAX_VALUE); - try (SessionDataSet sessionDataSet = - session.executeAggregationQuery(paths, aggregations, 0, 100, 10, 20)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void queryByIterator() - throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) { - - DataIterator iterator = dataSet.iterator(); - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (iterator.next()) { - StringBuilder builder = new StringBuilder(); - // get time - builder.append(iterator.getLong(1)).append(","); - // get second column - if (!iterator.isNull(2)) { - builder.append(iterator.getLong(2)).append(","); - } else { - builder.append("null").append(","); - } - - // get third column - if (!iterator.isNull(ROOT_SG1_D1_S2)) { - builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(","); - } else { - builder.append("null").append(","); - } - - // get forth column - if (!iterator.isNull(4)) { - builder.append(iterator.getLong(4)).append(","); - } else { - builder.append("null").append(","); - } - - // get fifth column - if (!iterator.isNull(ROOT_SG1_D1_S4)) { - builder.append(iterator.getObject(ROOT_SG1_D1_S4)); - } else { - builder.append("null"); - } - - System.out.println(builder); - } - } - } - - private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1)"); - } - - private static void setTimeout() throws StatementExecutionException, IoTDBConnectionException { - try (Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000)) { - tempSession.setQueryTimeout(60000); - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java deleted file mode 100644 index f3c70178ef..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb; - -import org.apache.iotdb.isession.SessionDataSet.DataIterator; -import org.apache.iotdb.isession.pool.SessionDataSetWrapper; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.pool.SessionPool; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class SessionPoolExample { - - private static SessionPool sessionPool; - private static ExecutorService service; - - /** Build a custom SessionPool for this example */ - private static void constructCustomSessionPool() { - sessionPool = - new SessionPool.Builder() - .host("127.0.0.1") - .port(6667) - .user("root") - .password("root") - .maxSize(3) - .build(); - } - - /** Build a redirect-able SessionPool for this example */ - private static void constructRedirectSessionPool() { - List<String> nodeUrls = new ArrayList<>(); - nodeUrls.add("127.0.0.1:6667"); - nodeUrls.add("127.0.0.1:6668"); - sessionPool = - new SessionPool.Builder() - .nodeUrls(nodeUrls) - .user("root") - .password("root") - .maxSize(3) - .build(); - } - - public static void main(String[] args) - throws StatementExecutionException, IoTDBConnectionException, InterruptedException { - // Choose the SessionPool you going to use - constructRedirectSessionPool(); - - service = Executors.newFixedThreadPool(10); - insertRecord(); - queryByRowRecord(); - Thread.sleep(1000); - queryByIterator(); - sessionPool.close(); - service.shutdown(); - } - - // more insert example, see SessionExample.java - private static void insertRecord() throws StatementExecutionException, IoTDBConnectionException { - String deviceId = "root.sg1.d1"; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 10; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - sessionPool.insertRecord(deviceId, time, measurements, types, values); - } - } - - private static void queryByRowRecord() { - for (int i = 0; i < 1; i++) { - service.submit( - () -> { - SessionDataSetWrapper wrapper = null; - try { - wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1"); - System.out.println(wrapper.getColumnNames()); - System.out.println(wrapper.getColumnTypes()); - while (wrapper.hasNext()) { - System.out.println(wrapper.next()); - } - } catch (IoTDBConnectionException | StatementExecutionException e) { - e.printStackTrace(); - } finally { - // remember to close data set finally! - sessionPool.closeResultSet(wrapper); - } - }); - } - } - - private static void queryByIterator() { - for (int i = 0; i < 1; i++) { - service.submit( - () -> { - SessionDataSetWrapper wrapper = null; - try { - wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1"); - // get DataIterator like JDBC - DataIterator dataIterator = wrapper.iterator(); - System.out.println(wrapper.getColumnNames()); - System.out.println(wrapper.getColumnTypes()); - while (dataIterator.next()) { - StringBuilder builder = new StringBuilder(); - for (String columnName : wrapper.getColumnNames()) { - builder.append(dataIterator.getString(columnName) + " "); - } - System.out.println(builder); - } - } catch (IoTDBConnectionException | StatementExecutionException e) { - e.printStackTrace(); - } finally { - // remember to close data set finally! - sessionPool.closeResultSet(wrapper); - } - }); - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java deleted file mode 100644 index baf0074ece..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb; - -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.isession.util.Version; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; - -import java.util.ArrayList; -import java.util.List; - -/** - * When using session API, measurement, device, database and path are represented by String. The - * content of the String should be the same as what you would write in a SQL statement. This class - * is an example to help you understand better. - */ -public class SyntaxConventionRelatedExample { - private static Session session; - private static final String LOCAL_HOST = "127.0.0.1"; - /** - * if you want to create a time series named root.sg1.select, a possible SQL statement would be - * like: create timeseries root.sg1.select with datatype=FLOAT, encoding=RLE As described before, - * when using session API, path is represented using String. The path should be written as - * "root.sg1.select". - */ - private static final String ROOT_SG1_KEYWORD_EXAMPLE = "root.sg1.select"; - - /** - * if you want to create a time series named root.sg1.111, a possible SQL statement would be like: - * create timeseries root.sg1.`111` with datatype=FLOAT, encoding=RLE The path should be written - * as "root.sg1.`111`". - */ - private static final String ROOT_SG1_DIGITS_EXAMPLE = "root.sg1.`111`"; - - /** - * if you want to create a time series named root.sg1.`a"b'c``, a possible SQL statement would be - * like: create timeseries root.sg1.`a"b'c``` with datatype=FLOAT, encoding=RLE The path should be - * written as "root.sg1.`a"b`c```". - */ - private static final String ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE = "root.sg1.`a\"b'c```"; - - /** - * if you want to create a time series named root.sg1.a, a possible SQL statement would be like: - * create timeseries root.sg1.a with datatype=FLOAT, encoding=RLE The path should be written as - * "root.sg1.a". - */ - private static final String ROOT_SG1_NORMAL_NODE_EXAMPLE = "root.sg1.a"; - - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException { - session = - new Session.Builder() - .host(LOCAL_HOST) - .port(6667) - .username("root") - .password("root") - .version(Version.V_1_0) - .build(); - session.open(false); - - // set session fetchSize - session.setFetchSize(10000); - - try { - session.setStorageGroup("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) { - throw e; - } - } - - // createTimeSeries - createTimeSeries(); - SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.sg1.*"); - // the expected paths would be: - // [root.sg1.select, root.sg1.`111`, root.sg1.`a"b'c```, root.sg1.a] - // You could see that time series in dataSet are exactly the same as - // the initial String you used as path. Node names consist of digits or contain special - // characters are quoted with ``, both in SQL statement and in header of result dataset. - // It's convenient that you can use the result of show timeseries as input parameter directly - // for other - // session APIs such as insertRecord or executeRawDataQuery. - List<String> paths = new ArrayList<>(); - while (dataSet.hasNext()) { - paths.add(dataSet.next().getFields().get(0).toString()); - } - - long startTime = 1L; - long endTime = 100L; - long timeOut = 60000; - - try (SessionDataSet dataSet1 = - session.executeRawDataQuery(paths, startTime, endTime, timeOut)) { - - System.out.println(dataSet1.getColumnNames()); - dataSet1.setFetchSize(1024); - while (dataSet1.hasNext()) { - System.out.println(dataSet1.next()); - } - } - } - - private static void createTimeSeries() - throws IoTDBConnectionException, StatementExecutionException { - if (!session.checkTimeseriesExists(ROOT_SG1_KEYWORD_EXAMPLE)) { - session.createTimeseries( - ROOT_SG1_KEYWORD_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_DIGITS_EXAMPLE)) { - session.createTimeseries( - ROOT_SG1_DIGITS_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE)) { - session.createTimeseries( - ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE, - TSDataType.FLOAT, - TSEncoding.RLE, - CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_NORMAL_NODE_EXAMPLE)) { - session.createTimeseries( - ROOT_SG1_NORMAL_NODE_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); - } - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/TabletExample.java b/example/session/src/main/java/org/apache/iotdb/TabletExample.java deleted file mode 100644 index 6cfd2491e8..0000000000 --- a/example/session/src/main/java/org/apache/iotdb/TabletExample.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb; - -import org.apache.iotdb.session.Session; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -public class TabletExample { - - private static final String TIME_STR = "time"; - - /** - * load csv data. - * - * @param measureTSTypeInfos key: measurement name, value: measurement data type - * @param dataFileName the csv file name to load - * @return key: measurement name, value: series in format of {@link ArrayList} - * @throws IOException if the csv format is incorrect - */ - private static Map<String, ArrayList> loadCSVData( - Map<String, TSDataType> measureTSTypeInfos, String dataFileName) throws IOException { - measureTSTypeInfos.put(TIME_STR, TSDataType.INT64); - try (BufferedReader reader = new BufferedReader(new FileReader(dataFileName))) { - String headline = reader.readLine(); - if (headline == null) { - throw new IOException("Given csv data file has not headers"); - } - // check the csv file format - String[] fileColumns = headline.split(","); - Map<String, Integer> columnToIdMap = new HashMap<>(); - for (int col = 0; col < fileColumns.length; col++) { - String columnName = fileColumns[col]; - if (columnToIdMap.containsKey(columnName)) { - throw new IOException( - String.format("csv file contains duplicate columns: %s", columnName)); - } - columnToIdMap.put(columnName, col); - } - Map<String, ArrayList> ret = new HashMap<>(); - // make sure that all measurements can be found from the data file - for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) { - String measurement = entry.getKey(); - if (!columnToIdMap.containsKey(entry.getKey())) { - throw new IOException(String.format("measurement %s's is not in csv file.", measurement)); - } else { - ret.put(measurement, new ArrayList<>()); - } - } - - String line; - while ((line = reader.readLine()) != null) { - String[] items = line.split(","); - for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) { - String measurement = entry.getKey(); - TSDataType dataType = entry.getValue(); - int idx = columnToIdMap.get(measurement); - switch (dataType) { - case BOOLEAN: - ret.get(measurement).add(Boolean.parseBoolean(items[idx])); - break; - case INT32: - ret.get(measurement).add(Integer.parseInt(items[idx])); - break; - case INT64: - ret.get(measurement).add(Long.parseLong(items[idx])); - break; - case FLOAT: - ret.get(measurement).add(Float.parseFloat(items[idx])); - break; - case DOUBLE: - ret.get(measurement).add(Double.parseDouble(items[idx])); - break; - case TEXT: - ret.get(measurement).add(Binary.valueOf(items[idx])); - break; - case VECTOR: - throw new IOException(String.format("data type %s is not yet.", TSDataType.VECTOR)); - } - } - } - return ret; - } finally { - measureTSTypeInfos.remove(TIME_STR); - } - } - - /** - * Read csv file and insert tablet to IoTDB - * - * @param args: arg(with default value): arg0: dataFileName(sample.csv), arg1: rowSize(10000), - * arg2: colSize(5000). - */ - public static void main(String[] args) throws Exception { - - Session session = new Session("127.0.0.1", 6667, "root", "root"); - session.open(); - String dataFileName = "sample.csv"; - int rowSize = 10000; - int colSize = 5000; - if (args.length > 1) { - dataFileName = args[0]; - } - if (args.length > 2) { - rowSize = Integer.parseInt(args[1]); - } - if (args.length > 3) { - colSize = Integer.parseInt(args[2]); - } - - // construct the tablet's measurements. - Map<String, TSDataType> measureTSTypeInfos = new HashMap<>(); - measureTSTypeInfos.put("s0", TSDataType.BOOLEAN); - measureTSTypeInfos.put("s1", TSDataType.FLOAT); - measureTSTypeInfos.put("s2", TSDataType.INT32); - measureTSTypeInfos.put("s3", TSDataType.DOUBLE); - measureTSTypeInfos.put("s4", TSDataType.INT64); - measureTSTypeInfos.put("s5", TSDataType.TEXT); - List<MeasurementSchema> schemas = new ArrayList<>(); - measureTSTypeInfos.forEach((mea, type) -> schemas.add(new MeasurementSchema(mea, type))); - - System.out.println( - String.format( - "Test Java: csv file name: %s, row: %d, col: %d", dataFileName, rowSize, colSize)); - System.out.println(String.format("Total points: %d", rowSize * colSize * schemas.size())); - - // test start - long allStart = System.nanoTime(); - - Map<String, ArrayList> data = loadCSVData(measureTSTypeInfos, dataFileName); - long loadCost = System.nanoTime() - allStart; - - long insertCost = 0; - for (int i = 0; i < colSize; i++) { - String deviceId = "root.sg" + i % 8 + "." + i; - - Tablet ta = new Tablet(deviceId, schemas, rowSize); - ta.rowSize = rowSize; - for (int t = 0; t < ta.rowSize; t++) { - ta.addTimestamp(t, (Long) data.get(TIME_STR).get(t)); - for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) { - String mea = entry.getKey(); - ta.addValue(mea, t, data.get(mea).get(t)); - } - } - long insertSt = System.nanoTime(); - session.insertTablet(ta, false); - insertCost += (System.nanoTime() - insertSt); - } - // test end - long allEnd = System.nanoTime(); - - session.executeNonQueryStatement("delete timeseries root.*"); - session.close(); - - System.out.println(String.format("load cost: %.3f", ((float) loadCost / 1000_000_000))); - System.out.println( - String.format( - "construct tablet cost: %.3f", - ((float) (allEnd - allStart - insertCost - loadCost) / 1000_000_000))); - System.out.println( - String.format("insert tablet cost: %.3f", ((float) insertCost / 1000_000_000))); - System.out.println( - String.format("total cost: %.3f", ((float) (allEnd - allStart) / 1000_000_000))); - System.out.println(String.format("%.3f", ((float) loadCost / 1000_000_000))); - } -} diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java new file mode 100644 index 0000000000..343ae9797e --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class WriteTest { + + private static SessionPool sessionPool; + + private static final Logger LOGGER = LoggerFactory.getLogger(WriteTest.class); + + private static int THREAD_NUMBER = 300; + + private static int DEVICE_NUMBER = 20000; + + private static int SENSOR_NUMBER = 500; + + private static int WRITE_LOOP = 100000; + + private static List<String> measurements; + + private static List<TSDataType> types; + + private static AtomicInteger totalRowNumber = new AtomicInteger(); + + private static float[] floatData = new float[10000]; + + private static Random r; + + /** Build a custom SessionPool for this example */ + + /** Build a redirect-able SessionPool for this example */ + private static void constructRedirectSessionPool() { + List<String> nodeUrls = new ArrayList<>(); + // nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("192.168.130.16:6667"); + nodeUrls.add("192.168.130.17:6667"); + nodeUrls.add("192.168.130.18:6667"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(500) + .build(); + sessionPool.setFetchSize(10000); + } + + private static class SyncWriteSignal { + protected volatile boolean needResetLatch = true; + protected CountDownLatch latch; + protected long currentTimestamp; + + protected int count; + + protected SyncWriteSignal(int count) { + this.count = count; + } + + protected void syncCountDownBeforeInsert() { + if (needResetLatch) { + synchronized (this) { + if (needResetLatch) { + latch = new CountDownLatch(this.count); + needResetLatch = false; + currentTimestamp = System.currentTimeMillis(); + } + } + } + } + + protected void finishInsertAndWait(int loopIndex) throws InterruptedException { + CountDownLatch currentLatch = latch; + synchronized (this) { + currentLatch.countDown(); + if (currentLatch.getCount() == 0) { + needResetLatch = true; + LOGGER.info( + "write loop[{}] finished. cost: {}ms. total rows: {}. total points: {}", + loopIndex, + (System.currentTimeMillis() - currentTimestamp), + totalRowNumber.get(), + (long) totalRowNumber.get() * SENSOR_NUMBER); + } + } + currentLatch.await(); + } + } + + private static class InsertWorker implements Runnable { + private SyncWriteSignal signal; + private int index; + + protected InsertWorker(SyncWriteSignal signal, int index) { + this.signal = signal; + this.index = index; + } + + @Override + public void run() { + for (int j = 0; j < WRITE_LOOP; j++) { + signal.syncCountDownBeforeInsert(); + try { + int insertDeviceCount = insertRecords(index, signal.currentTimestamp); + totalRowNumber.addAndGet(insertDeviceCount); + signal.finishInsertAndWait(j); + } catch (Exception e) { + LOGGER.error("insert error. Thread: {}. Error:", index, e); + } + } + LOGGER.info("insert worker finished"); + } + } + + public static void main(String[] args) throws InterruptedException { + // Choose the SessionPool you going to use + constructRedirectSessionPool(); + + measurements = new ArrayList<>(); + types = new ArrayList<>(); + for (int i = 0; i < SENSOR_NUMBER; i++) { + measurements.add("s_" + i); + types.add(TSDataType.FLOAT); + } + + r = new Random(); + + for (int i = 0; i < floatData.length; i++) { + floatData[i] = r.nextFloat(); + } + + Thread[] threads = new Thread[THREAD_NUMBER]; + + SyncWriteSignal signal = new SyncWriteSignal(THREAD_NUMBER); + for (int i = 0; i < THREAD_NUMBER; i++) { + threads[i] = new Thread(new InsertWorker(signal, i)); + } + + // count total execution time + long startTime = System.currentTimeMillis(); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + sessionPool.close(); + System.out.println(System.currentTimeMillis() - startTime); + })); + + // start write thread + for (Thread thread : threads) { + thread.start(); + } + + long startTime1 = System.nanoTime(); + new Thread( + () -> { + while (true) { + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + long currentTime = System.nanoTime(); + LOGGER.info( + "write rate: {} lines/minute", + totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L)); + } + }) + .start(); + } + + // more insert example, see SessionExample.java + private static int insertRecords(int threadIndex, long timestamp) + throws StatementExecutionException, IoTDBConnectionException { + List<String> deviceIds = new ArrayList<>(); + List<Long> times = new ArrayList<>(); + List<List<String>> measurementsList = new ArrayList<>(); + List<List<TSDataType>> typesList = new ArrayList<>(); + List<List<Object>> valuesList = new ArrayList<>(); + int deviceCount = 0; + for (int j = threadIndex; j < DEVICE_NUMBER; j += THREAD_NUMBER) { + String deviceId = "root.test.g_0.d_" + j; + deviceIds.add(deviceId); + times.add(timestamp); + List<Object> values = new ArrayList<>(); + for (int i = 0; i < SENSOR_NUMBER; i++) { + values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]); + } + valuesList.add(values); + measurementsList.add(measurements); + typesList.add(types); + deviceCount++; + } + + sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); + return deviceCount; + } +} diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java new file mode 100644 index 0000000000..ca352705c7 --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class WriteTestFixParallel { + + private static SessionPool sessionPool; + + private static final Logger LOGGER = LoggerFactory.getLogger(WriteTestFixParallel.class); + + private static int THREAD_NUMBER = 300; + + private static int DEVICE_NUMBER = 20000; + + private static int SENSOR_NUMBER = 500; + + private static int WRITE_LOOP = 100000; + + private static List<String> measurements; + + private static List<TSDataType> types; + + private static AtomicInteger totalRowNumber = new AtomicInteger(); + + private static Random r; + + /** Build a custom SessionPool for this example */ + + /** Build a redirect-able SessionPool for this example */ + private static void constructRedirectSessionPool() { + List<String> nodeUrls = new ArrayList<>(); + // nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("10.24.58.58:6667"); + nodeUrls.add("10.24.58.67:6667"); + nodeUrls.add("10.24.58.69:6667"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(500) + .build(); + sessionPool.setFetchSize(10000); + } + + private static class SyncWriteSignal { + protected volatile boolean needResetLatch = true; + protected CountDownLatch latch; + protected long currentTimestamp; + protected Semaphore semaphore; + protected int count; + + protected SyncWriteSignal(int count) { + this.count = count; + this.semaphore = new Semaphore(20); + } + + protected void syncCountDownBeforeInsert() throws InterruptedException { + if (needResetLatch) { + synchronized (this) { + if (needResetLatch) { + latch = new CountDownLatch(this.count); + needResetLatch = false; + currentTimestamp = System.currentTimeMillis(); + } + } + } + semaphore.acquire(); + } + + protected void finishInsertAndWait(int loopIndex) throws InterruptedException { + semaphore.release(); + CountDownLatch currentLatch = latch; + synchronized (this) { + currentLatch.countDown(); + if (currentLatch.getCount() == 0) { + needResetLatch = true; + LOGGER.info( + "write loop[{}] finished. cost: {}ms. total rows: {}. total points: {}", + loopIndex, + (System.currentTimeMillis() - currentTimestamp), + totalRowNumber.get(), + (long) totalRowNumber.get() * SENSOR_NUMBER); + } + } + currentLatch.await(); + } + } + + private static class InsertWorker implements Runnable { + private SyncWriteSignal signal; + private int index; + + protected InsertWorker(SyncWriteSignal signal, int index) { + this.signal = signal; + this.index = index; + } + + @Override + public void run() { + for (int j = 0; j < WRITE_LOOP; j++) { + try { + signal.syncCountDownBeforeInsert(); + int insertDeviceCount = insertRecords(index, signal.currentTimestamp); + totalRowNumber.addAndGet(insertDeviceCount); + signal.finishInsertAndWait(j); + } catch (Exception e) { + LOGGER.error("insert error. Thread: {}. Error:", index, e); + } + } + LOGGER.info("insert worker finished"); + } + } + + public static void main(String[] args) throws InterruptedException { + // Choose the SessionPool you going to use + constructRedirectSessionPool(); + + measurements = new ArrayList<>(); + types = new ArrayList<>(); + for (int i = 0; i < SENSOR_NUMBER; i++) { + measurements.add("s_" + i); + types.add(TSDataType.FLOAT); + } + + Thread[] threads = new Thread[THREAD_NUMBER]; + + SyncWriteSignal signal = new SyncWriteSignal(THREAD_NUMBER); + for (int i = 0; i < THREAD_NUMBER; i++) { + threads[i] = new Thread(new InsertWorker(signal, i)); + } + + // count total execution time + r = new Random(); + long startTime = System.currentTimeMillis(); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + sessionPool.close(); + System.out.println(System.currentTimeMillis() - startTime); + })); + + // start write thread + for (Thread thread : threads) { + thread.start(); + } + + long startTime1 = System.nanoTime(); + new Thread( + () -> { + while (true) { + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + long currentTime = System.nanoTime(); + LOGGER.info( + "write rate: {} lines/minute", + totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L)); + } + }) + .start(); + } + + // more insert example, see SessionExample.java + private static int insertRecords(int threadIndex, long timestamp) + throws StatementExecutionException, IoTDBConnectionException { + List<String> deviceIds = new ArrayList<>(); + List<Long> times = new ArrayList<>(); + List<List<String>> measurementsList = new ArrayList<>(); + List<List<TSDataType>> typesList = new ArrayList<>(); + List<List<Object>> valuesList = new ArrayList<>(); + int deviceCount = 0; + for (int j = threadIndex; j < DEVICE_NUMBER; j += THREAD_NUMBER) { + String deviceId = "root.test.g_0.d_" + j; + deviceIds.add(deviceId); + times.add(timestamp); + List<Object> values = new ArrayList<>(); + for (int i = 0; i < SENSOR_NUMBER; i++) { + values.add(r.nextFloat()); + } + valuesList.add(values); + measurementsList.add(measurements); + typesList.add(types); + deviceCount++; + } + + sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); + return deviceCount; + } +}
