This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 302007323e314ee2e08b77263904c766c8c9f00c Author: Lei Rui <[email protected]> AuthorDate: Wed Sep 27 22:46:13 2023 +0800 add MinMax --- .../db/query/udf/builtin/BuiltinFunction.java | 3 +- .../iotdb/db/query/udf/builtin/UDTFMinMax.java | 333 ++++++++++++++++++++- .../apache/iotdb/db/integration/m4/MyTest6.java | 1 + .../integration/m4/{MyTest6.java => MyTest7.java} | 178 +++++++---- 4 files changed, 456 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java index ebde0642972..5a9639faaf4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java @@ -49,7 +49,8 @@ public enum BuiltinFunction { BOTTOM_K("BOTTOM_K", UDTFBottomK.class), M4("M4", UDTFM4MAC.class), M4_TW("M4_TW", UDTFM4.class), - Sample("SAMPLE", UDTFSample.class); + Sample("SAMPLE", UDTFSample.class), + MinMax("MINMAX", UDTFMinMax.class); ; private final String functionName; diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFMinMax.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFMinMax.java index da6dee8a979..6a8b1e1695a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFMinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFMinMax.java @@ -1,3 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.iotdb.db.query.udf.builtin; -public class UDTFMinMax {} +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.udf.api.UDTF; +import org.apache.iotdb.db.query.udf.api.access.Row; +import org.apache.iotdb.db.query.udf.api.collector.PointCollector; +import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; +import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator; +import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.exception.UDFException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.common.IOMonitor2.DataSetType; + +import java.io.IOException; + +public class UDTFMinMax implements UDTF { + + protected TSDataType dataType; + protected long tqs; + protected long tqe; + protected int w; + + private long bottomTime; + private long topTime; + + private int intMaxV; + private long longMaxV; + private float floatMaxV; + private double doubleMaxV; + + private int intMinV; + private long longMinV; + private float floatMinV; + private double doubleMinV; + + private String[] result; + private int idx; + + private void init() { + this.bottomTime = 0; + this.topTime = 0; + + this.intMinV = Integer.MAX_VALUE; + this.longMinV = Long.MAX_VALUE; + this.floatMinV = Float.MAX_VALUE; + this.doubleMinV = Double.MAX_VALUE; + + this.intMaxV = Integer.MIN_VALUE; + this.longMaxV = Long.MIN_VALUE; + this.floatMaxV = Float.MIN_VALUE; + this.doubleMaxV = Double.MIN_VALUE; + } + + @Override + public void validate(UDFParameterValidator validator) throws UDFException { + IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet_M4_POINT; + validator + .validateInputSeriesNumber(1) + .validateInputSeriesDataType( + 0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE) + .validateRequiredAttribute("tqs") + .validateRequiredAttribute("tqe") + .validateRequiredAttribute("w"); + } + + @Override + public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) + throws MetadataException { + dataType = parameters.getDataType(0); + tqs = parameters.getLong("tqs"); // closed + tqe = parameters.getLong("tqe"); // open + w = parameters.getInt("w"); + if ((tqe - tqs) % w != 0) { + throw new MetadataException("You should make tqe-tqs integer divide w"); + } + configurations + .setAccessStrategy(new RowByRowAccessStrategy()) + .setOutputDataType(TSDataType.TEXT); + init(); + this.idx = -1; + result = new String[w]; + for (int i = 0; i < w; i++) { + result[i] = "empty"; + } + } + + @Override + public void transform(Row row, PointCollector collector) + throws QueryProcessException, IOException { + if (idx < 0) { + idx = 0; // means at least not all empty + } + switch (dataType) { + case INT32: + transformInt(row.getTime(), row.getInt(0)); + break; + case INT64: + transformLong(row.getTime(), row.getLong(0)); + break; + case FLOAT: + transformFloat(row.getTime(), row.getFloat(0)); + break; + case DOUBLE: + transformDouble(row.getTime(), row.getDouble(0)); + break; + default: + break; + } + } + + protected void transformInt(long time, int value) throws IOException { + long intervalLen = (tqe - tqs) / w; + int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen); + if (pos >= w) { + throw new IOException("Make sure the range time filter is time>=tqs and time<tqe"); + } + + if (pos > idx) { + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + intMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + intMaxV + + ")"; + idx = pos; + init(); // clear environment for this new interval + } + // update for the current interval + if (value < intMinV) { + bottomTime = time; + intMinV = value; + } + if (value > intMaxV) { + topTime = time; + intMaxV = value; + } + } + + protected void transformLong(long time, long value) throws IOException { + long intervalLen = (tqe - tqs) / w; + int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen); + + if (pos >= w) { + throw new IOException("Make sure the range time filter is time>=tqs and time<tqe"); + } + + if (pos > idx) { + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + longMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + longMaxV + + ")"; + idx = pos; + init(); // clear environment for this new interval + } + + if (value < longMinV) { + bottomTime = time; + longMinV = value; + } + if (value > longMaxV) { + topTime = time; + longMaxV = value; + } + } + + protected void transformFloat(long time, float value) throws IOException { + long intervalLen = (tqe - tqs) / w; + int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen); + + if (pos >= w) { + throw new IOException("Make sure the range time filter is time>=tqs and time<tqe"); + } + + if (pos > idx) { + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + floatMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + floatMaxV + + ")"; + idx = pos; + init(); // clear environment for this new interval + } + + if (value < floatMinV) { + bottomTime = time; + floatMinV = value; + } + if (value > floatMaxV) { + topTime = time; + floatMaxV = value; + } + } + + protected void transformDouble(long time, double value) throws IOException { + long intervalLen = (tqe - tqs) / w; + int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen); + + if (pos >= w) { + throw new IOException("Make sure the range time filter is time>=tqs and time<tqe"); + } + + if (pos > idx) { + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + doubleMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + doubleMaxV + + ")"; + idx = pos; + init(); // clear environment for this new interval + } + + if (value < doubleMinV) { + bottomTime = time; + doubleMinV = value; + } + if (value > doubleMaxV) { + topTime = time; + doubleMaxV = value; + } + } + + @Override + public void terminate(PointCollector collector) throws IOException, QueryProcessException { + if (idx >= 0) { // means at least not all empty + // record the last interval (not necessarily idx=w-1) in the transform stage + switch (dataType) { + case INT32: + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + intMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + intMaxV + + ")"; + break; + case INT64: + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + longMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + longMaxV + + ")"; + break; + case FLOAT: + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + floatMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + floatMaxV + + ")"; + break; + case DOUBLE: + result[idx] = + "BottomPoint=(" + + bottomTime + + "," + + doubleMinV + + "), " + + "TopPoint=(" + + topTime + + "," + + doubleMaxV + + ")"; + break; + default: + break; + } + } + // collect result + for (int i = 0; i < w; i++) { + long startInterval = tqs + (tqe - tqs) / w * i; + collector.putString(startInterval, result[i]); + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java index 46de9698d4d..c4442855377 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java @@ -41,6 +41,7 @@ import java.util.Locale; import static org.junit.Assert.fail; public class MyTest6 { + // test LTTB private static final String TIMESTAMP_STR = "Time"; diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest7.java similarity index 56% copy from server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java copy to server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest7.java index 46de9698d4d..e690338183e 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest6.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest7.java @@ -34,14 +34,13 @@ import org.junit.Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.util.Locale; import static org.junit.Assert.fail; -public class MyTest6 { - +public class MyTest7 { + // test MinMax private static final String TIMESTAMP_STR = "Time"; private static String[] creationSqls = @@ -89,83 +88,148 @@ public class MyTest6 { } @Test - public void testM4Function() { - // create timeseries + public void test1() { // test UDF MAC + prepareData1(); + + String[] res = + new String[] { + "0,BottomPoint=(1,5.0), TopPoint=(10,30.0)", + "25,BottomPoint=(25,8.0), TopPoint=(30,40.0)", + "50,BottomPoint=(52,8.0), TopPoint=(54,18.0)", + "75,empty" + }; try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { - statement.execute("SET STORAGE GROUP TO root.m4"); - statement.execute("CREATE TIMESERIES root.m4.d1.s1 with datatype=double,encoding=PLAIN"); - statement.execute("CREATE TIMESERIES root.m4.d1.s2 with datatype=INT64,encoding=PLAIN"); - } catch (SQLException throwable) { - fail(throwable.getMessage()); + long tqs = 0L; + long tqe = 100L; + int w = 4; + boolean hasResultSet = + statement.execute( + String.format( + "select MinMax(s0,'tqs'='%1$d','tqe'='%2$d','w'='%3$d') from root.vehicle.d0 where " + + "time>=%1$d and time<%2$d", + tqs, tqe, w)); + + String columnName = + "MinMax(root.vehicle.d0.s0, \"tqs\"=\"%d\", \"tqe\"=\"%d\", \"w\"=\"%d\")"; + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(String.format(columnName, tqs, tqe, w)); + System.out.println(ans); + Assert.assertEquals(res[i++], ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } + } + + @Test + public void test2() { // test UDF MAC extreme case: empty from the beginning + prepareData2(); - // insert data - String insertTemplate = "INSERT INTO root.m4.d1(timestamp,%s)" + " VALUES(%d,%d)"; + String[] res = new String[] {"0,empty", "50,empty"}; try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { - // "root.m4.d1.s1" data illustration: - // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 1, 5)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 2, 15)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 20, 1)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 25, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 54, 3)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 120, 8)); - statement.execute("FLUSH"); + long tqs = 0L; + long tqe = 100L; + int w = 2; + boolean hasResultSet = + statement.execute( + String.format( + "select MinMax(s0,'tqs'='%1$d','tqe'='%2$d','w'='%3$d') from root.vehicle.d0 where " + + "time>=%1$d and time<%2$d", + tqs, tqe, w)); + + String columnName = + "MinMax(root.vehicle.d0.s0, \"tqs\"=\"%d\", \"tqe\"=\"%d\", \"w\"=\"%d\")"; + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(String.format(columnName, tqs, tqe, w)); + System.out.println(ans); + Assert.assertEquals(res[i++], ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData1() { + // data: + // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 5, 10)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 8, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 10, 30)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 20, 20)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8)); statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 27, 20)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 30, 40)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 35, 10)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 40, 20)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 10)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 30)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 20)); statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 33, 9)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 45, 30)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 52, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 54, 18)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 27, 20)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 40)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 35, 10)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 20)); statement.execute("FLUSH"); - // "root.m4.d1.s2" data: constant value 1 - for (int i = 0; i < 100; i++) { - statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s2", i, 1)); - } + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 33, 9)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 45, 30)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 52, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 18)); statement.execute("FLUSH"); + } catch (Exception e) { e.printStackTrace(); } - - // query tests - test_sample(); } - private void test_sample() { - String[] res = new String[] {"1,5.0", "10,30.0", "33,9.0", "120,8.0"}; - - String sql = "select Sample(s1, 'method'='triangle','k'='4') from root.m4.d1"; + private static void prepareData2() { + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { - try (Connection conn = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); - Statement statement = conn.createStatement()) { - ResultSet resultSet = statement.executeQuery(sql); - int count = 0; - while (resultSet.next()) { - String str = resultSet.getString(1) + "," + resultSet.getString(2); - Assert.assertEquals(res[count], str); - count++; - System.out.println(str); + for (String sql : creationSqls) { + statement.execute(sql); } - Assert.assertEquals(res.length, count); - } catch (SQLException throwable) { - fail(throwable.getMessage()); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 200, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 300, 15)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 400, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 500, 8)); + statement.execute("FLUSH"); + + } catch (Exception e) { + e.printStackTrace(); } } }
