JackieTien97 commented on code in PR #14791: URL: https://github.com/apache/iotdb/pull/14791#discussion_r1986566593
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; + +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.DateUtils; + +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +/** Parts of partition. */ +public class Slice { + + private final Column[] requiredColumns; + private final Column[] passThroughColumns; + private final List<Type> dataTypes; + private final long size; + + public Slice( + int startIndex, + int endIndex, + Column[] columns, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<Type> dataTypes) { + this.size = endIndex - startIndex; + List<Column> partitionColumns = + Arrays.stream(columns) + .map(i -> i.getRegion(startIndex, (int) size)) + .collect(Collectors.toList()); + this.requiredColumns = + requiredChannels.stream().map(partitionColumns::get).toArray(Column[]::new); + this.passThroughColumns = + passThroughChannels.stream().map(partitionColumns::get).toArray(Column[]::new); + this.dataTypes = dataTypes; + } + + public long getSize() { + return size; + } + + public Column[] getPassThroughResult(int[] indexes) { + return Arrays.stream(passThroughColumns) + .map(i -> i.getPositions(indexes, 0, indexes.length)) + .toArray(Column[]::new); + } + + public Iterator<Record> getRequiredRecordIterator() { + return new Iterator<Record>() { + private int curIndex = 0; + + @Override + public boolean hasNext() { + return curIndex < size; + } + + @Override + public Record next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final int idx = curIndex++; + return getRecord(idx, requiredColumns); + } + }; + } + + private Record getRecord(int offset, Column[] originalColumns) { + return new Record() { Review Comment: I think it may be better to add a RecordImpl class to implement Record instead of contructing a new class each time. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function; + +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class PartitionRecognizer { + + private final List<Integer> partitionChannels; + private final List<Object> partitionValues; + private final List<Integer> requiredChannels; + private final List<Integer> passThroughChannels; + private final List<Type> inputDataTypes; + private TsBlock currentTsBlock = null; + private boolean noMoreData = false; + private int currentIndex = 0; + private PartitionState state = PartitionState.INIT_STATE; + + public PartitionRecognizer( + List<Integer> partitionChannels, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<TSDataType> inputDataTypes) { + this.partitionChannels = partitionChannels; + this.partitionValues = new ArrayList<>(partitionChannels.size()); + for (int i = 0; i < partitionChannels.size(); i++) { + partitionValues.add(null); + } + this.requiredChannels = requiredChannels; + this.passThroughChannels = passThroughChannels; + this.inputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes); + } + + // TsBlock is sorted by partition columns already + public void addTsBlock(TsBlock tsBlock) { + if (noMoreData) { + throw new IllegalArgumentException( + "The partition handler is finished, cannot add more data."); + } + currentTsBlock = tsBlock; + } + + /** Marks the handler as finished. */ + public void noMoreData() { + noMoreData = true; + } + + public PartitionState nextState() { + updateState(); + return state; + } + + private void updateState() { + switch (state.getStateType()) { + case INIT: + state = handleInitState(); + break; + case NEW_PARTITION: + state = handleNewPartitionState(); + break; + case ITERATING: + state = handleIteratingState(); + break; + case NEED_MORE_DATA: + state = handleNeedMoreDataState(); + break; + case FINISHED: + // do nothing + return; + } + if (PartitionState.NEED_MORE_DATA_STATE.equals(state)) { + currentIndex = 0; + } + } + + private PartitionState handleInitState() { + if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.INIT_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + + private PartitionState handleNewPartitionState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleNeedMoreDataState() { + if (noMoreData) { + return PartitionState.FINISHED_STATE; + } else if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.NEED_MORE_DATA_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + if (endPartitionIndex != 0) { + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.iteratingState(slice); + } else { + currentIndex = endPartitionIndex; + endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleIteratingState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + /** + * Find next row index whose partition values are different from the current partition values. If + * all rows have the same partition values, return the position count of the current TsBlock. + */ + private int findNextDifferentRowIndex() { + int i = currentIndex; + while (i < currentTsBlock.getPositionCount()) { + for (int j = 0; j < partitionChannels.size(); j++) { + if (!Objects.equals( + partitionValues.get(j), + currentTsBlock.getColumn(partitionChannels.get(j)).getObject(i))) { Review Comment: I think it may be better to use Comparator<SortKey> to do this ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function; + +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class PartitionRecognizer { + + private final List<Integer> partitionChannels; + private final List<Object> partitionValues; + private final List<Integer> requiredChannels; + private final List<Integer> passThroughChannels; + private final List<Type> inputDataTypes; + private TsBlock currentTsBlock = null; + private boolean noMoreData = false; + private int currentIndex = 0; + private PartitionState state = PartitionState.INIT_STATE; + + public PartitionRecognizer( + List<Integer> partitionChannels, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<TSDataType> inputDataTypes) { + this.partitionChannels = partitionChannels; + this.partitionValues = new ArrayList<>(partitionChannels.size()); + for (int i = 0; i < partitionChannels.size(); i++) { + partitionValues.add(null); + } + this.requiredChannels = requiredChannels; + this.passThroughChannels = passThroughChannels; + this.inputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes); + } + + // TsBlock is sorted by partition columns already + public void addTsBlock(TsBlock tsBlock) { + if (noMoreData) { + throw new IllegalArgumentException( + "The partition handler is finished, cannot add more data."); + } + currentTsBlock = tsBlock; + } + + /** Marks the handler as finished. */ + public void noMoreData() { + noMoreData = true; + } + + public PartitionState nextState() { + updateState(); + return state; + } + + private void updateState() { + switch (state.getStateType()) { + case INIT: + state = handleInitState(); + break; + case NEW_PARTITION: + state = handleNewPartitionState(); + break; + case ITERATING: + state = handleIteratingState(); + break; + case NEED_MORE_DATA: + state = handleNeedMoreDataState(); + break; + case FINISHED: + // do nothing + return; + } + if (PartitionState.NEED_MORE_DATA_STATE.equals(state)) { + currentIndex = 0; + } + } + + private PartitionState handleInitState() { + if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.INIT_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + + private PartitionState handleNewPartitionState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleNeedMoreDataState() { + if (noMoreData) { + return PartitionState.FINISHED_STATE; + } else if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.NEED_MORE_DATA_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + if (endPartitionIndex != 0) { + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.iteratingState(slice); + } else { + currentIndex = endPartitionIndex; + endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleIteratingState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); Review Comment: what if endPartitionIndex == endPostion? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; + +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.DateUtils; + +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +/** Parts of partition. */ +public class Slice { + + private final Column[] requiredColumns; + private final Column[] passThroughColumns; + private final List<Type> dataTypes; + private final long size; + + public Slice( + int startIndex, + int endIndex, + Column[] columns, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<Type> dataTypes) { + this.size = endIndex - startIndex; + List<Column> partitionColumns = + Arrays.stream(columns) + .map(i -> i.getRegion(startIndex, (int) size)) + .collect(Collectors.toList()); + this.requiredColumns = + requiredChannels.stream().map(partitionColumns::get).toArray(Column[]::new); + this.passThroughColumns = + passThroughChannels.stream().map(partitionColumns::get).toArray(Column[]::new); + this.dataTypes = dataTypes; + } + + public long getSize() { + return size; + } + + public Column[] getPassThroughResult(int[] indexes) { + return Arrays.stream(passThroughColumns) + .map(i -> i.getPositions(indexes, 0, indexes.length)) + .toArray(Column[]::new); + } + + public Iterator<Record> getRequiredRecordIterator() { + return new Iterator<Record>() { + private int curIndex = 0; + + @Override + public boolean hasNext() { + return curIndex < size; + } + + @Override + public Record next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final int idx = curIndex++; + return getRecord(idx, requiredColumns); + } + }; + } + + private Record getRecord(int offset, Column[] originalColumns) { + return new Record() { Review Comment: same as next() method in `org.apache.iotdb.commons.udf.access.RecordIterator` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function; + +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class PartitionRecognizer { + + private final List<Integer> partitionChannels; + private final List<Object> partitionValues; + private final List<Integer> requiredChannels; + private final List<Integer> passThroughChannels; + private final List<Type> inputDataTypes; + private TsBlock currentTsBlock = null; + private boolean noMoreData = false; + private int currentIndex = 0; + private PartitionState state = PartitionState.INIT_STATE; + + public PartitionRecognizer( + List<Integer> partitionChannels, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<TSDataType> inputDataTypes) { + this.partitionChannels = partitionChannels; + this.partitionValues = new ArrayList<>(partitionChannels.size()); + for (int i = 0; i < partitionChannels.size(); i++) { + partitionValues.add(null); + } + this.requiredChannels = requiredChannels; + this.passThroughChannels = passThroughChannels; + this.inputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes); + } + + // TsBlock is sorted by partition columns already + public void addTsBlock(TsBlock tsBlock) { + if (noMoreData) { + throw new IllegalArgumentException( + "The partition handler is finished, cannot add more data."); + } + currentTsBlock = tsBlock; + } + + /** Marks the handler as finished. */ + public void noMoreData() { + noMoreData = true; + } + + public PartitionState nextState() { + updateState(); + return state; + } + + private void updateState() { + switch (state.getStateType()) { + case INIT: + state = handleInitState(); + break; + case NEW_PARTITION: + state = handleNewPartitionState(); + break; + case ITERATING: + state = handleIteratingState(); + break; + case NEED_MORE_DATA: + state = handleNeedMoreDataState(); + break; + case FINISHED: + // do nothing + return; + } + if (PartitionState.NEED_MORE_DATA_STATE.equals(state)) { + currentIndex = 0; + } + } + + private PartitionState handleInitState() { + if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.INIT_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + + private PartitionState handleNewPartitionState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleNeedMoreDataState() { + if (noMoreData) { + return PartitionState.FINISHED_STATE; + } else if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.NEED_MORE_DATA_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + if (endPartitionIndex != 0) { + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.iteratingState(slice); + } else { + currentIndex = endPartitionIndex; + endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleIteratingState() { Review Comment: It seems that `handleIteratingState` is same as `handleNewPartitionState` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function; + +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class PartitionRecognizer { + + private final List<Integer> partitionChannels; + private final List<Object> partitionValues; + private final List<Integer> requiredChannels; + private final List<Integer> passThroughChannels; + private final List<Type> inputDataTypes; + private TsBlock currentTsBlock = null; + private boolean noMoreData = false; + private int currentIndex = 0; + private PartitionState state = PartitionState.INIT_STATE; + + public PartitionRecognizer( + List<Integer> partitionChannels, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<TSDataType> inputDataTypes) { + this.partitionChannels = partitionChannels; + this.partitionValues = new ArrayList<>(partitionChannels.size()); + for (int i = 0; i < partitionChannels.size(); i++) { + partitionValues.add(null); + } + this.requiredChannels = requiredChannels; + this.passThroughChannels = passThroughChannels; + this.inputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes); + } + + // TsBlock is sorted by partition columns already + public void addTsBlock(TsBlock tsBlock) { + if (noMoreData) { + throw new IllegalArgumentException( + "The partition handler is finished, cannot add more data."); + } + currentTsBlock = tsBlock; + } + + /** Marks the handler as finished. */ + public void noMoreData() { + noMoreData = true; + } + + public PartitionState nextState() { + updateState(); + return state; + } + + private void updateState() { + switch (state.getStateType()) { + case INIT: + state = handleInitState(); + break; + case NEW_PARTITION: + state = handleNewPartitionState(); + break; + case ITERATING: + state = handleIteratingState(); + break; + case NEED_MORE_DATA: + state = handleNeedMoreDataState(); + break; + case FINISHED: + // do nothing + return; + } + if (PartitionState.NEED_MORE_DATA_STATE.equals(state)) { + currentIndex = 0; + } + } + + private PartitionState handleInitState() { + if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.INIT_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + + private PartitionState handleNewPartitionState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleNeedMoreDataState() { + if (noMoreData) { + return PartitionState.FINISHED_STATE; + } else if (currentTsBlock == null || currentTsBlock.isEmpty()) { + return PartitionState.NEED_MORE_DATA_STATE; + } + int endPartitionIndex = findNextDifferentRowIndex(); + if (endPartitionIndex != 0) { + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.iteratingState(slice); + } else { + currentIndex = endPartitionIndex; + endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + private PartitionState handleIteratingState() { + if (currentIndex >= currentTsBlock.getPositionCount()) { + return PartitionState.NEED_MORE_DATA_STATE; + } else { + int endPartitionIndex = findNextDifferentRowIndex(); + Slice slice = getSlice(currentIndex, endPartitionIndex); + currentIndex = endPartitionIndex; + return PartitionState.newPartitionState(slice); + } + } + + /** + * Find next row index whose partition values are different from the current partition values. If + * all rows have the same partition values, return the position count of the current TsBlock. + */ + private int findNextDifferentRowIndex() { + int i = currentIndex; + while (i < currentTsBlock.getPositionCount()) { + for (int j = 0; j < partitionChannels.size(); j++) { + if (!Objects.equals( + partitionValues.get(j), + currentTsBlock.getColumn(partitionChannels.get(j)).getObject(i))) { Review Comment: what if index i of Column j is null? getObject won't handle this case ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.SliceCache; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.LongColumnBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +// only one input source is supported now +public class TableFunctionOperator implements ProcessOperator { + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final TableFunctionProcessorProvider processorProvider; + private final PartitionRecognizer partitionRecognizer; + private final TsBlockBuilder blockBuilder; + private final int properChannelCount; + private final boolean needPassThrough; + + private TableFunctionDataProcessor processor; + private PartitionState partitionState; + private ListenableFuture<?> isBlocked; + private boolean finished = false; + private ColumnBuilder passThroughIndexBuilder; + + private SliceCache sliceCache; + + public TableFunctionOperator( + OperatorContext operatorContext, + TableFunctionProcessorProvider processorProvider, + Operator inputOperator, + List<TSDataType> inputDataTypes, + List<TSDataType> outputDataTypes, + int properChannelCount, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<Integer> partitionChannels) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.properChannelCount = properChannelCount; + this.processorProvider = processorProvider; + this.partitionRecognizer = + new PartitionRecognizer( + partitionChannels, requiredChannels, passThroughChannels, inputDataTypes); + this.needPassThrough = properChannelCount != outputDataTypes.size(); + this.partitionState = null; + this.blockBuilder = new TsBlockBuilder(outputDataTypes); + this.sliceCache = new SliceCache(); + } + + @Override + public OperatorContext getOperatorContext() { + return this.operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + if (isBlocked == null) { + isBlocked = tryGetNextTsBlock(); + } + return isBlocked; + } + + private ListenableFuture<?> tryGetNextTsBlock() { + try { + if (inputOperator.isFinished()) { + partitionRecognizer.noMoreData(); + return NOT_BLOCKED; + } + if (!inputOperator.isBlocked().isDone()) { + return inputOperator.isBlocked(); + } + if (inputOperator.hasNextWithTimer()) { + partitionRecognizer.addTsBlock(inputOperator.nextWithTimer()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return NOT_BLOCKED; + } + + @Override + public TsBlock next() throws Exception { + PartitionState.StateType stateType = partitionState.getStateType(); + Slice slice = partitionState.getSlice(); + if (stateType == PartitionState.StateType.INIT + || stateType == PartitionState.StateType.NEED_MORE_DATA) { + consumeCurrentPartitionState(); + consumeCurrentSourceTsBlock(); + return null; + } else { + List<ColumnBuilder> properColumnBuilders = getProperColumnBuilders(); + ColumnBuilder passThroughIndexBuilder = getPassThroughIndexBuilder(); + if (stateType == PartitionState.StateType.FINISHED) { + if (processor != null) { + processor.finish(properColumnBuilders, passThroughIndexBuilder); + } + finished = true; + TsBlock tsBlock = buildTsBlock(properColumnBuilders, passThroughIndexBuilder); + sliceCache.clear(); + consumeCurrentPartitionState(); + return tsBlock; + } + if (stateType == PartitionState.StateType.NEW_PARTITION) { + if (processor != null) { + // previous partition state has not finished consuming yet + processor.finish(properColumnBuilders, passThroughIndexBuilder); + TsBlock tsBlock = buildTsBlock(properColumnBuilders, passThroughIndexBuilder); + sliceCache.clear(); + processor = null; + return tsBlock; + } else { + processor = processorProvider.getDataProcessor(); + } + } + sliceCache.addSlice(slice); + Iterator<Record> recordIterator = slice.getRequiredRecordIterator(); + while (recordIterator.hasNext()) { + processor.process(recordIterator.next(), properColumnBuilders, passThroughIndexBuilder); + } + consumeCurrentPartitionState(); + return buildTsBlock(properColumnBuilders, passThroughIndexBuilder); + } + } + + private List<ColumnBuilder> getProperColumnBuilders() { + blockBuilder.reset(); + return Arrays.asList(blockBuilder.getValueColumnBuilders()).subList(0, properChannelCount); + } + + private ColumnBuilder getPassThroughIndexBuilder() { + return new LongColumnBuilder(null, 1); + } + + private TsBlock buildTsBlock( + List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + List<ColumnBuilder> passThroughColumnBuilders = + Arrays.asList(blockBuilder.getValueColumnBuilders()) + .subList(properChannelCount, blockBuilder.getValueColumnBuilders().length); + int positionCount = 0; + if (properChannelCount > 0) { + // if there is proper column, use its position count + positionCount = properColumnBuilders.get(0).getPositionCount(); + } else if (needPassThrough) { + // if there is no proper column, use pass through column's position count + positionCount = passThroughIndexBuilder.getPositionCount(); + } + if (positionCount == 0) { + return null; + } + blockBuilder.declarePositions(positionCount); + if (needPassThrough) { + // handle pass through column only if needed + Column passThroughIndex = passThroughIndexBuilder.build(); + for (Column[] passThroughColumns : sliceCache.getPassThroughResult(passThroughIndex)) { + for (int i = 0; i < passThroughColumns.length; i++) { + ColumnBuilder passThroughColumnBuilder = passThroughColumnBuilders.get(i); + for (int j = 0; j < passThroughColumns[i].getPositionCount(); j++) { + if (passThroughColumns[i].isNull(j)) { + passThroughColumnBuilder.appendNull(); + } else { + passThroughColumnBuilder.write(passThroughColumns[i], j); + } + } + } + } + } + return blockBuilder.build(new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount)); + } + + private void consumeCurrentPartitionState() { + partitionState = null; + } + + private void consumeCurrentSourceTsBlock() { + isBlocked = null; + } + + @Override + public boolean hasNext() throws Exception { + if (partitionState == null) { + isBlocked().get(); // wait for the next TsBlock Review Comment: we should never block in hasNext and next ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.SliceCache; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.LongColumnBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +// only one input source is supported now +public class TableFunctionOperator implements ProcessOperator { + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final TableFunctionProcessorProvider processorProvider; + private final PartitionRecognizer partitionRecognizer; + private final TsBlockBuilder blockBuilder; + private final int properChannelCount; + private final boolean needPassThrough; + + private TableFunctionDataProcessor processor; + private PartitionState partitionState; + private ListenableFuture<?> isBlocked; + private boolean finished = false; + private ColumnBuilder passThroughIndexBuilder; + + private SliceCache sliceCache; + + public TableFunctionOperator( + OperatorContext operatorContext, + TableFunctionProcessorProvider processorProvider, + Operator inputOperator, + List<TSDataType> inputDataTypes, + List<TSDataType> outputDataTypes, + int properChannelCount, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, + List<Integer> partitionChannels) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.properChannelCount = properChannelCount; + this.processorProvider = processorProvider; + this.partitionRecognizer = + new PartitionRecognizer( + partitionChannels, requiredChannels, passThroughChannels, inputDataTypes); + this.needPassThrough = properChannelCount != outputDataTypes.size(); + this.partitionState = null; + this.blockBuilder = new TsBlockBuilder(outputDataTypes); + this.sliceCache = new SliceCache(); + } + + @Override + public OperatorContext getOperatorContext() { + return this.operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + if (isBlocked == null) { + isBlocked = tryGetNextTsBlock(); + } + return isBlocked; + } + + private ListenableFuture<?> tryGetNextTsBlock() { + try { + if (inputOperator.isFinished()) { + partitionRecognizer.noMoreData(); + return NOT_BLOCKED; + } + if (!inputOperator.isBlocked().isDone()) { + return inputOperator.isBlocked(); + } + if (inputOperator.hasNextWithTimer()) { + partitionRecognizer.addTsBlock(inputOperator.nextWithTimer()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return NOT_BLOCKED; + } + + @Override + public TsBlock next() throws Exception { + PartitionState.StateType stateType = partitionState.getStateType(); + Slice slice = partitionState.getSlice(); + if (stateType == PartitionState.StateType.INIT + || stateType == PartitionState.StateType.NEED_MORE_DATA) { + consumeCurrentPartitionState(); + consumeCurrentSourceTsBlock(); + return null; + } else { + List<ColumnBuilder> properColumnBuilders = getProperColumnBuilders(); + ColumnBuilder passThroughIndexBuilder = getPassThroughIndexBuilder(); + if (stateType == PartitionState.StateType.FINISHED) { + if (processor != null) { + processor.finish(properColumnBuilders, passThroughIndexBuilder); + } + finished = true; + TsBlock tsBlock = buildTsBlock(properColumnBuilders, passThroughIndexBuilder); + sliceCache.clear(); + consumeCurrentPartitionState(); + return tsBlock; + } + if (stateType == PartitionState.StateType.NEW_PARTITION) { + if (processor != null) { + // previous partition state has not finished consuming yet + processor.finish(properColumnBuilders, passThroughIndexBuilder); + TsBlock tsBlock = buildTsBlock(properColumnBuilders, passThroughIndexBuilder); + sliceCache.clear(); + processor = null; + return tsBlock; + } else { + processor = processorProvider.getDataProcessor(); + } + } + sliceCache.addSlice(slice); + Iterator<Record> recordIterator = slice.getRequiredRecordIterator(); + while (recordIterator.hasNext()) { + processor.process(recordIterator.next(), properColumnBuilders, passThroughIndexBuilder); + } + consumeCurrentPartitionState(); + return buildTsBlock(properColumnBuilders, passThroughIndexBuilder); + } + } + + private List<ColumnBuilder> getProperColumnBuilders() { + blockBuilder.reset(); + return Arrays.asList(blockBuilder.getValueColumnBuilders()).subList(0, properChannelCount); + } + + private ColumnBuilder getPassThroughIndexBuilder() { + return new LongColumnBuilder(null, 1); + } + + private TsBlock buildTsBlock( + List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + List<ColumnBuilder> passThroughColumnBuilders = + Arrays.asList(blockBuilder.getValueColumnBuilders()) + .subList(properChannelCount, blockBuilder.getValueColumnBuilders().length); + int positionCount = 0; + if (properChannelCount > 0) { + // if there is proper column, use its position count + positionCount = properColumnBuilders.get(0).getPositionCount(); + } else if (needPassThrough) { + // if there is no proper column, use pass through column's position count + positionCount = passThroughIndexBuilder.getPositionCount(); + } + if (positionCount == 0) { + return null; + } + blockBuilder.declarePositions(positionCount); + if (needPassThrough) { + // handle pass through column only if needed + Column passThroughIndex = passThroughIndexBuilder.build(); + for (Column[] passThroughColumns : sliceCache.getPassThroughResult(passThroughIndex)) { + for (int i = 0; i < passThroughColumns.length; i++) { + ColumnBuilder passThroughColumnBuilder = passThroughColumnBuilders.get(i); + for (int j = 0; j < passThroughColumns[i].getPositionCount(); j++) { + if (passThroughColumns[i].isNull(j)) { + passThroughColumnBuilder.appendNull(); + } else { + passThroughColumnBuilder.write(passThroughColumns[i], j); + } + } + } + } + } + return blockBuilder.build(new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount)); + } + + private void consumeCurrentPartitionState() { + partitionState = null; + } + + private void consumeCurrentSourceTsBlock() { + isBlocked = null; + } + + @Override + public boolean hasNext() throws Exception { + if (partitionState == null) { + isBlocked().get(); // wait for the next TsBlock + partitionState = partitionRecognizer.nextState(); + } + return !finished; + } + + @Override + public void close() throws Exception { + sliceCache.close(); + inputOperator.close(); + } + + @Override + public boolean isFinished() throws Exception { + return finished; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } Review Comment: implement these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
