This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5692 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5139bd4420dba28325eecb91637943f7cfadeb9 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Mar 17 12:24:28 2023 +0800 pipe agent skeleton --- .../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 53 ++++++++++++ .../iotdb/db/pipe/agent/PipePluginAgent.java | 4 +- .../iotdb/db/pipe/agent/PipeRuntimeAgent.java | 35 ++++++++ .../apache/iotdb/db/pipe/agent/PipeTaskAgent.java | 35 ++++++++ .../impl/DataNodeInternalRPCServiceImpl.java | 98 ++-------------------- 5 files changed, 135 insertions(+), 90 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java new file mode 100644 index 0000000000..e94807e8e1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent; + +public class PipeAgent { + + /** Private constructor to prevent users from creating a new instance. */ + private PipeAgent() {} + + /** + * Get the singleton instance of PipeTaskAgent. + * + * @return the singleton instance of PipeTaskAgent + */ + public static PipeTaskAgent task() { + return PipeTaskAgent.getInstance(); + } + + /** + * Get the singleton instance of PipePluginAgent. + * + * @return the singleton instance of PipePluginAgent + */ + public static PipePluginAgent plugin() { + return PipePluginAgent.getInstance(); + } + + /** + * Get the singleton instance of PipeRuntimeAgent. + * + * @return the singleton instance of PipeRuntimeAgent + */ + public static PipeRuntimeAgent runtime() { + return PipeRuntimeAgent.getInstance(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java index d820441dac..a0a62ca118 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java @@ -181,11 +181,13 @@ public class PipePluginAgent { ///////////////////////// Singleton Instance Holder ///////////////////////// + private PipePluginAgent() {} + private static class PipePluginAgentServiceHolder { private static final PipePluginAgent INSTANCE = new PipePluginAgent(); } - public static PipePluginAgent getInstance() { + static PipePluginAgent getInstance() { return PipePluginAgentServiceHolder.INSTANCE; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java new file mode 100644 index 0000000000..fe9ab3ee8a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent; + +public class PipeRuntimeAgent { + + ///////////////////////// Singleton Instance Holder ///////////////////////// + + private PipeRuntimeAgent() {} + + private static class PipeRuntimeAgentHolder { + private static final PipeRuntimeAgent INSTANCE = new PipeRuntimeAgent(); + } + + static PipeRuntimeAgent getInstance() { + return PipeRuntimeAgent.PipeRuntimeAgentHolder.INSTANCE; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java new file mode 100644 index 0000000000..75c8e14636 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent; + +public class PipeTaskAgent { + + ///////////////////////// Singleton Instance Holder ///////////////////////// + + private PipeTaskAgent() {} + + private static class PipeTaskAgentHolder { + private static final PipeTaskAgent INSTANCE = new PipeTaskAgent(); + } + + static PipeTaskAgent getInstance() { + return PipeTaskAgent.PipeTaskAgentHolder.INSTANCE; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 60edda2b56..308afe736d 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -19,14 +19,8 @@ package org.apache.iotdb.db.service.thrift.impl; -import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TFlushReq; -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; -import org.apache.iotdb.common.rpc.thrift.TSettleReq; +import com.google.common.collect.ImmutableList; +import org.apache.iotdb.common.rpc.thrift.*; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -92,17 +86,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.*; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler; import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition; import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; -import org.apache.iotdb.db.pipe.agent.PipePluginAgent; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.query.control.clientsession.InternalClientSession; @@ -115,68 +104,7 @@ import org.apache.iotdb.db.trigger.service.TriggerManagementService; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.metrics.type.AutoGauge; import org.apache.iotdb.metrics.utils.MetricLevel; -import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService; -import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq; -import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq; -import org.apache.iotdb.mpp.rpc.thrift.TCancelResp; -import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq; -import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq; -import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq; -import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp; -import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq; -import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq; -import org.apache.iotdb.mpp.rpc.thrift.TDeleteModelMetricsReq; -import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq; -import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ; -import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceInfoReq; -import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq; -import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp; -import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq; -import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp; -import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq; -import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp; -import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq; -import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp; -import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp; -import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; -import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; -import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq; -import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq; -import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq; -import org.apache.iotdb.mpp.rpc.thrift.TLoadResp; -import org.apache.iotdb.mpp.rpc.thrift.TLoadSample; -import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; -import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq; -import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; -import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; -import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq; -import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq; -import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest; -import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse; -import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; -import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq; -import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq; -import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq; -import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; +import org.apache.iotdb.mpp.rpc.thrift.*; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.trigger.api.enums.FailureStrategy; @@ -185,8 +113,6 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.write.record.Tablet; - -import com.google.common.collect.ImmutableList; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,12 +121,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1147,8 +1068,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface if (configNodeLocations != null) { ConfigNodeInfo.getInstance() .updateConfigNodeList( - configNodeLocations - .parallelStream() + configNodeLocations.parallelStream() .map(TConfigNodeLocation::getInternalEndPoint) .collect(Collectors.toList())); } @@ -1482,7 +1402,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface public TSStatus createPipePlugin(TCreatePipePluginInstanceReq req) { try { PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(req.pipePluginMeta); - PipePluginAgent.getInstance().register(pipePluginMeta, req.jarFile); + PipeAgent.plugin().register(pipePluginMeta, req.jarFile); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (Exception e) { return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode()) @@ -1493,7 +1413,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus dropPipePlugin(TDropPipePluginInstanceReq req) { try { - PipePluginAgent.getInstance().deregister(req.getPipePluginName(), req.isNeedToDeleteJar()); + PipeAgent.plugin().deregister(req.getPipePluginName(), req.isNeedToDeleteJar()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (Exception e) { return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
