github-advanced-security[bot] commented on code in PR #43:
URL: https://github.com/apache/iotdb-extras/pull/43#discussion_r1964647512


##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSinkTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSinkTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> SINK_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> SINK_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !SINK_EXECUTOR.containsKey(taskId) && 
!SINK_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-sink-executor-" + 
taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    SINK_EXECUTOR.putIfAbsent(taskId, executorService);
+    SINK_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector sink task {}", taskId);
+  }
+
+  @Override
+  public void eraseExecution(final String taskId) {
+    SINK_TASK_MAP.remove(taskId).stop();
+    SINK_EXECUTOR.remove(taskId).shutdownNow();
+
+    LOGGER.info("deregister collector sink task {}", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/11)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> PROCESSOR_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> PROCESSOR_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !PROCESSOR_EXECUTOR.containsKey(taskId) && 
!PROCESSOR_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-" 
+ taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
+    PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector processor task {}", taskId);
+  }
+
+  @Override
+  public void eraseExecution(final String taskId) {
+    PROCESSOR_TASK_MAP.remove(taskId).stop();
+    PROCESSOR_EXECUTOR.remove(taskId).shutdownNow();
+
+    LOGGER.info("deregister collector processor task {}", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/9)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSourceTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSourceTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> SOURCE_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> SOURCE_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !SOURCE_EXECUTOR.containsKey(taskId) && 
!SOURCE_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-source-executor-" + 
taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    SOURCE_EXECUTOR.put(taskId, executorService);
+    SOURCE_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector source task {}", taskId);
+  }
+
+  @Override
+  public void eraseExecution(String taskId) {
+    SOURCE_TASK_MAP.remove(taskId).stop();
+    SOURCE_EXECUTOR.remove(taskId).shutdownNow();
+
+    LOGGER.info("deregister collector source task {}", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/13)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.impl;
+
+import org.apache.iotdb.collector.agent.CollectorAgent;
+import org.apache.iotdb.collector.api.v1.AdminApiService;
+import org.apache.iotdb.collector.api.v1.NotFoundException;
+import org.apache.iotdb.collector.api.v1.handler.RequestValidationHandler;
+import org.apache.iotdb.collector.api.v1.model.AlterPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
+import org.apache.iotdb.collector.api.v1.model.DropPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StartPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class AdminApiServiceImpl extends AdminApiService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminApiServiceImpl.class);
+
+  @Override
+  public Response alterPipe(
+      final AlterPipeRequest alterPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("alterPipe").build();
+  }
+
+  @Override
+  public Response createPipe(
+      final CreatePipeRequest createPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    RequestValidationHandler.validateCreateRequest(createPipeRequest);
+
+    final boolean createdResult =
+        CollectorAgent.task()
+            .createCollectorTask(
+                createPipeRequest.getSourceAttribute(),
+                createPipeRequest.getProcessorAttribute(),
+                createPipeRequest.getSinkAttribute(),
+                createPipeRequest.getTaskId());
+    if (createdResult) {
+      LOGGER.info("Create task successful");
+      return Response.status(Response.Status.OK).entity("create task 
success").build();
+    }
+    LOGGER.warn("Create task failed");
+    return Response.status(Response.Status.BAD_REQUEST).entity("create task 
fail").build();
+  }
+
+  @Override
+  public Response dropPipe(
+      final DropPipeRequest dropPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("dropPipe").build();
+  }
+
+  @Override
+  public Response startPipe(
+      final StartPipeRequest startPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("startPipe").build();
+  }
+
+  @Override
+  public Response stopPipe(
+      final StopPipeRequest stopPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    RequestValidationHandler.validateStopRequest(stopPipeRequest);
+
+    final boolean stopResult = 
CollectorAgent.task().stopCollectorTask(stopPipeRequest.getTaskId());
+    if (stopResult) {
+      LOGGER.info("Stop task: {} successful", stopPipeRequest.getTaskId());
+      return Response.ok().entity("stop task: " + stopPipeRequest.getTaskId() 
+ " success").build();
+    }
+    LOGGER.warn("Stop task: {} failed", stopPipeRequest.getTaskId());

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/15)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSourceTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSourceTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> SOURCE_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> SOURCE_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !SOURCE_EXECUTOR.containsKey(taskId) && 
!SOURCE_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-source-executor-" + 
taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    SOURCE_EXECUTOR.put(taskId, executorService);
+    SOURCE_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector source task {}", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/12)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public abstract class CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorTaskExecutor.class);
+
+  public void register(final CollectorTask collectorTask) {
+    if (validateIfAbsent(collectorTask.getTaskId())) {
+      getExecutor(collectorTask.getTaskId())
+          .ifPresent(
+              executor -> {
+                executor.submit(collectorTask);
+                LOGGER.info("register success {}", collectorTask.getTaskId());
+                recordExecution(collectorTask, executor);
+              });
+    } else {
+      LOGGER.warn("task {} has existed", collectorTask.getTaskId());
+    }
+  }
+
+  public abstract boolean validateIfAbsent(final String taskId);
+
+  public abstract Optional<ExecutorService> getExecutor(final String taskId);
+
+  public abstract void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService);
+
+  public void deregister(final String taskId) {
+    if (!validateIfAbsent(taskId)) {
+      eraseExecution(taskId);
+    } else {
+      LOGGER.warn("task {} has not existed", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/7)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSinkTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSinkTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> SINK_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> SINK_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !SINK_EXECUTOR.containsKey(taskId) && 
!SINK_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-sink-executor-" + 
taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    SINK_EXECUTOR.putIfAbsent(taskId, executorService);
+    SINK_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector sink task {}", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/10)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> PROCESSOR_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> PROCESSOR_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !PROCESSOR_EXECUTOR.containsKey(taskId) && 
!PROCESSOR_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-" 
+ taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
+    PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector processor task {}", taskId);

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/8)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public abstract class CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorTaskExecutor.class);
+
+  public void register(final CollectorTask collectorTask) {
+    if (validateIfAbsent(collectorTask.getTaskId())) {
+      getExecutor(collectorTask.getTaskId())
+          .ifPresent(
+              executor -> {
+                executor.submit(collectorTask);
+                LOGGER.info("register success {}", collectorTask.getTaskId());

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/5)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.impl;
+
+import org.apache.iotdb.collector.agent.CollectorAgent;
+import org.apache.iotdb.collector.api.v1.AdminApiService;
+import org.apache.iotdb.collector.api.v1.NotFoundException;
+import org.apache.iotdb.collector.api.v1.handler.RequestValidationHandler;
+import org.apache.iotdb.collector.api.v1.model.AlterPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
+import org.apache.iotdb.collector.api.v1.model.DropPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StartPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class AdminApiServiceImpl extends AdminApiService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminApiServiceImpl.class);
+
+  @Override
+  public Response alterPipe(
+      final AlterPipeRequest alterPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("alterPipe").build();
+  }
+
+  @Override
+  public Response createPipe(
+      final CreatePipeRequest createPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    RequestValidationHandler.validateCreateRequest(createPipeRequest);
+
+    final boolean createdResult =
+        CollectorAgent.task()
+            .createCollectorTask(
+                createPipeRequest.getSourceAttribute(),
+                createPipeRequest.getProcessorAttribute(),
+                createPipeRequest.getSinkAttribute(),
+                createPipeRequest.getTaskId());
+    if (createdResult) {
+      LOGGER.info("Create task successful");
+      return Response.status(Response.Status.OK).entity("create task 
success").build();
+    }
+    LOGGER.warn("Create task failed");
+    return Response.status(Response.Status.BAD_REQUEST).entity("create task 
fail").build();
+  }
+
+  @Override
+  public Response dropPipe(
+      final DropPipeRequest dropPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("dropPipe").build();
+  }
+
+  @Override
+  public Response startPipe(
+      final StartPipeRequest startPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("startPipe").build();
+  }
+
+  @Override
+  public Response stopPipe(
+      final StopPipeRequest stopPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    RequestValidationHandler.validateStopRequest(stopPipeRequest);
+
+    final boolean stopResult = 
CollectorAgent.task().stopCollectorTask(stopPipeRequest.getTaskId());
+    if (stopResult) {
+      LOGGER.info("Stop task: {} successful", stopPipeRequest.getTaskId());

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/14)



##########
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public abstract class CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorTaskExecutor.class);
+
+  public void register(final CollectorTask collectorTask) {
+    if (validateIfAbsent(collectorTask.getTaskId())) {
+      getExecutor(collectorTask.getTaskId())
+          .ifPresent(
+              executor -> {
+                executor.submit(collectorTask);
+                LOGGER.info("register success {}", collectorTask.getTaskId());
+                recordExecution(collectorTask, executor);
+              });
+    } else {
+      LOGGER.warn("task {} has existed", collectorTask.getTaskId());

Review Comment:
   ## Log Injection
   
   This log entry depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/iotdb-extras/security/code-scanning/6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to