This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5195 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2cd4808462af2a4c882b015031b0b2cb2d056e8a Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Dec 14 04:02:09 2022 +0800 basic udf class struct --- .../multi/UniversalUDFQueryTransformer.java | 2 +- .../transformation/dag/udf/base/UDTFExecutor.java | 5 +- .../dag/udf/python/PythonPointCollector.java | 23 ++++++++ .../transformation/dag/udf/python/PythonUDTF.java | 64 ++++++++++++++++++++++ .../dag/udf/python/PythonUDTFExecutor.java | 61 +++++++++++++++++++++ .../dag/udf/python/PythonUDTFPointCollector.java | 22 ++++++++ 6 files changed, 174 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java index 2927006b40..df2aee45ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java @@ -34,7 +34,7 @@ public abstract class UniversalUDFQueryTransformer extends UDFQueryTransformer { protected UniversalUDFQueryTransformer(UDTFExecutor executor) { super(executor); - layerPointReader = executor.getCollector().constructPointReaderUsingTrivialEvictionStrategy(); + layerPointReader = executor.getPointCollector(); isLayerPointReaderConstant = layerPointReader.isConstantPointReader(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/base/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/base/UDTFExecutor.java index cb5c477030..1d739719b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/base/UDTFExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/base/UDTFExecutor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.transformation.dag.udf.base; import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader; import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.udf.api.UDTF; @@ -160,7 +161,7 @@ public class UDTFExecutor { return configurations; } - public ElasticSerializableTVList getCollector() { - return collector; + public LayerPointReader getPointCollector() { + return collector.constructPointReaderUsingTrivialEvictionStrategy(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonPointCollector.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonPointCollector.java new file mode 100644 index 0000000000..b43e9f02c3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonPointCollector.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.transformation.dag.udf.python; + +public class PythonPointCollector { +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTF.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTF.java new file mode 100644 index 0000000000..96a808bb09 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTF.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.transformation.dag.udf.python; + +import org.apache.iotdb.udf.api.UDTF; +import org.apache.iotdb.udf.api.access.Row; +import org.apache.iotdb.udf.api.access.RowWindow; +import org.apache.iotdb.udf.api.collector.PointCollector; +import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; +import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; +import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; + +public class PythonUDTF implements UDTF { + @Override + public void validate(UDFParameterValidator validator) throws Exception { + UDTF.super.validate(validator); + } + + @Override + public void beforeDestroy() { + UDTF.super.beforeDestroy(); + } + + @Override + public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) + throws Exception {} + + @Override + public void transform(Row row, PointCollector collector) throws Exception { + UDTF.super.transform(row, collector); + } + + @Override + public void transform(RowWindow rowWindow, PointCollector collector) throws Exception { + UDTF.super.transform(rowWindow, collector); + } + + @Override + public Object transform(Row row) throws Exception { + return UDTF.super.transform(row); + } + + @Override + public void terminate(PointCollector collector) throws Exception { + UDTF.super.terminate(collector); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTFExecutor.java new file mode 100644 index 0000000000..a1fe9dabd0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTFExecutor.java @@ -0,0 +1,61 @@ +package org.apache.iotdb.db.mpp.transformation.dag.udf.python; + +import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader; +import org.apache.iotdb.db.mpp.transformation.dag.udf.base.UDTFExecutor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.udf.api.access.Row; +import org.apache.iotdb.udf.api.access.RowWindow; + +import java.time.ZoneId; +import java.util.List; +import java.util.Map; + +public class PythonUDTFExecutor extends UDTFExecutor { + + public PythonUDTFExecutor(String functionName, ZoneId zoneId) { + super(functionName, zoneId); + } + + @Override + public void beforeStart( + long queryId, + float collectorMemoryBudgetInMB, + List<String> childExpressions, + List<TSDataType> childExpressionDataTypes, + Map<String, String> attributes) {} + + @Override + public void execute(Row row, boolean isCurrentRowNull) {} + + @Override + public void execute(RowWindow rowWindow) {} + + @Override + public void terminate() {} + + @Override + public void beforeDestroy() {} + + @Override + public LayerPointReader getPointCollector() { + throw new UnsupportedOperationException(); + } + + /** + * The strategy {@code MappableRowByRowAccessStrategy} is not supported by the Python UDF + * framework. + */ + @Override + public void execute(Row row) { + throw new UnsupportedOperationException(); + } + + /** + * The strategy {@code MappableRowByRowAccessStrategy} is not supported by the Python UDF + * framework. + */ + @Override + public Object getCurrentValue() { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTFPointCollector.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTFPointCollector.java new file mode 100644 index 0000000000..0bc32cab65 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/python/PythonUDTFPointCollector.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.transformation.dag.udf.python; + +public class PythonUDTFPointCollector {}
