This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d32aa168d108eaa4fd6a6784eaab92b97f8408b3 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Mar 8 23:40:05 2022 +0800 design of source, sink and internal operators --- .../distribution/operator/ExecutableOperator.java | 34 ++++++++++++++++++ .../operator/internal/DeviceMergeOperator.java | 2 +- .../operator/internal/FillOperator.java | 2 +- .../operator/internal/FilterInternalOperator.java | 2 +- .../operator/internal/GroupByLevelOperator.java | 2 +- .../operator/internal/InternalOperator.java | 20 +++-------- .../operator/internal/LimitOperator.java | 2 +- .../operator/internal/OffsetOperator.java | 2 +- .../operator/internal/SeriesAggregateOperator.java | 2 +- .../operator/internal/SortOperator.java | 2 +- .../operator/internal/TimeJoinOperator.java | 2 +- .../operator/internal/WithoutOperator.java | 2 +- .../operator/sink/CsvSinkOperator.java | 41 ++++++++++++++++++++++ .../operator/sink/RestSinkOperator.java | 41 ++++++++++++++++++++++ .../distribution/operator/sink/SinkOperator.java | 28 +++++++++++++++ .../operator/sink/ThriftSinkOperator.java | 41 ++++++++++++++++++++++ .../operator/source/CsvSourceOperator.java | 41 ++++++++++++++++++++++ .../operator/source/SeriesScanOperator.java | 10 ++++-- .../operator/source/SourceOperator.java | 28 +++++++++++++++ 19 files changed, 277 insertions(+), 27 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java new file mode 100644 index 0000000..00a7a6a --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.TreeNode; + +public abstract class ExecutableOperator<T> extends TreeNode<ExecutableOperator<?>> { + + // Resource control, runtime control... + + // Judge whether current operator has more result + public abstract boolean hasNext(); + + // Get next result batch of this operator + // Return null if there is no more result to return + public abstract T getNextBatch(); +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java index b912994..d4bbced 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java index 8ee610a..595a27e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.FillPolicy; import org.apache.iotdb.cluster.query.distribution.common.Tablet; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java index ecbda0f..f41d768 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; import org.apache.iotdb.db.qp.logical.crud.FilterOperator; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java index c6e00d2..9cab1c0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java index 0692381..a5b8b5b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java @@ -1,17 +1,7 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; -import org.apache.iotdb.cluster.query.distribution.common.TreeNode; +import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator; -/** - * @author xingtanzjr The base class of query executable operators, which is used to compose logical - * query plan. TODO: consider how to restrict the children type for each type of ExecOperator - */ -public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> { - - // Judge whether current operator has more result - public abstract boolean hasNext(); - - // Get next result batch of this operator - // Return null if there is no more result to return - public abstract T getNextBatch(); -} +// 从 buffer 拉数据 +// 推送到下游的逻辑 +public abstract class InternalOperator<T> extends ExecutableOperator<T> {} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java index 9075f64..1330eee 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java index c6f79ac..aa82d8c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java index 96a5c86..889c50b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java index fa83be7..d2aee44 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java index c4ae6f3..bd7cf37 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java index 535a3d6..f678a2c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.internal; import org.apache.iotdb.cluster.query.distribution.common.Tablet; import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java new file mode 100644 index 0000000..17bba65 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator.sink; + +import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; + +public class CsvSinkOperator extends SinkOperator<SeriesBatchData> { + + @Override + public void close() throws Exception {} + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchData getNextBatch() { + return null; + } + + @Override + public void open() throws Exception {} +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java new file mode 100644 index 0000000..d8e6588 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator.sink; + +import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; + +public class RestSinkOperator extends SinkOperator<SeriesBatchData> { + + @Override + public void close() throws Exception {} + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchData getNextBatch() { + return null; + } + + @Override + public void open() throws Exception {} +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java new file mode 100644 index 0000000..29c906d --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator.sink; + +import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator; + +// 构建与客户端的联系。 +public abstract class SinkOperator<T> extends ExecutableOperator<T> implements AutoCloseable { + + public abstract void open() throws Exception; +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java new file mode 100644 index 0000000..c9de077 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator.sink; + +import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; + +public class ThriftSinkOperator extends SinkOperator<SeriesBatchData> { + + @Override + public void close() throws Exception {} + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchData getNextBatch() { + return null; + } + + @Override + public void open() throws Exception {} +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java new file mode 100644 index 0000000..ff82128 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator.source; + +import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; + +public class CsvSourceOperator extends SourceOperator<SeriesBatchData> { + + @Override + public void close() throws Exception {} + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchData getNextBatch() { + return null; + } + + @Override + public void open() throws Exception {} +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java index 9cd6043..fd67f9e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java @@ -1,4 +1,4 @@ -package org.apache.iotdb.cluster.query.distribution.operator; +package org.apache.iotdb.cluster.query.distribution.operator.source; import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; @@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; * * <p>Children type: [] */ -public class SeriesScanOperator extends InternalOperator<SeriesBatchData> { +public class SeriesScanOperator extends SourceOperator<SeriesBatchData> { // The path of the target series which will be scanned. private Path seriesPath; @@ -55,4 +55,10 @@ public class SeriesScanOperator extends InternalOperator<SeriesBatchData> { public Statistics<?> getNextStatisticBetween(TimeRange timeRange) { return null; } + + @Override + public void close() throws Exception {} + + @Override + public void open() throws Exception {} } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java new file mode 100644 index 0000000..304cafe --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.distribution.operator.source; + +import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator; + +// 区分不同的数据源,可拓展。 +public abstract class SourceOperator<T> extends ExecutableOperator<T> implements AutoCloseable { + + public abstract void open() throws Exception; +}
