bejancsaba commented on a change in pull request #5755:
URL: https://github.com/apache/nifi/pull/5755#discussion_r805041949



##########
File path: 
minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
##########
@@ -81,6 +82,11 @@ public InputStream getInputStream() throws 
ConfigurationProviderException {
     return s3Object.getObjectContent();
   }
 
+  @Override
+  public URL getURL() throws ConfigurationProviderException {

Review comment:
       I think no ConfigurationProviderException is thrown here.

##########
File path: 
minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml
##########
@@ -37,3 +37,31 @@ Paths:
     # Default authorization lets anonymous pull any config.  Remove below to 
change that.
     - Authorization: ROLE_ANONYMOUS
       Action: allow
+
+  /c2/config/heartbeat:

Review comment:
       What do you think about not placing heartbeat and acknowledge under 
config. Seems like a higher level concept could be either directly under c2. 
What do you think?

##########
File path: 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.service;
+
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationState;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.apache.nifi.minifi.c2.util.LogMarkerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+@Service
+public class SimpleC2ProtocolService implements C2ProtocolService {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SimpleC2ProtocolService.class);
+
+    private static final Set<String> issuedOperationIds = new HashSet<>();
+
+    public SimpleC2ProtocolService() {
+    }
+
+    @Override
+    public void processOperationAck(final C2OperationAck operationAck, final 
C2ProtocolContext context) {
+        // This service assumes there is a single Operation UPDATE to pass 
over the updated flow
+        Marker marker = LogMarkerUtil.getMarker(operationAck);
+
+        logger.debug(marker, "Received operation acknowledgement: {}; {}", 
operationAck, context);
+        // Remove the operator ID from the list of issued operations and log 
the state
+        final String operationId = operationAck.getOperationId();
+        try {
+            OperationState opState = OperationState.DONE;
+            String details = null;
+
+            /* Partial applications are rare and only happen when an operation 
consists of updating multiple config
+             * items and some succeed ( we don't yet have the concept of 
rollback in agents ).
+             * Fully Applied yields an operation success.
+             * Operation Not Understood and Not Applied give little details 
but also will result in Operation Failure.
+             * We should explore if providing textual details. */
+            final C2OperationState c2OperationState = 
operationAck.getOperationState();
+            if (null != c2OperationState) {
+                details = c2OperationState.getDetails();
+                if (c2OperationState.getState() != 
C2OperationState.OperationState.FULLY_APPLIED) {
+                    opState = OperationState.FAILED;
+                }
+            }
+
+            if (!issuedOperationIds.remove(operationId)) {
+                logger.warn(marker, "Operation with ID " + operationId + " has 
either already been acknowledged or is unknown to this server");
+            } else if (null != c2OperationState) {
+                final C2OperationState.OperationState operationState = 
c2OperationState.getState();
+                logger.debug("Operation with ID " + operationId + " 
acknowledged with a state of " + operationState.name() + "(" + opState.name() + 
"), details = "
+                        + (details == null ? "" : details));
+            }
+
+            // Optionally, an acknowledgement can include some of the info 
normally passed in a heartbeat.
+            // If this info is present, process it as a heartbeat, so we 
update our latest known state of the agent.
+            if (operationAck.getAgentInfo() != null
+                    || operationAck.getDeviceInfo() != null
+                    || operationAck.getFlowInfo() != null) {
+                final C2Heartbeat heartbeatInfo = toHeartbeat(operationAck);
+                logger.trace(marker, "Operation acknowledgement contains 
additional info. Processing as heartbeat: {}", heartbeatInfo);
+                processHeartbeat(heartbeatInfo, context);
+            }
+
+        } catch (final Exception e) {
+            logger.warn("Encountered exception while processing operation 
ack", e);
+        }
+    }
+
+    @Override
+    public C2HeartbeatResponse processHeartbeat(final C2Heartbeat heartbeat, 
final C2ProtocolContext context) {

Review comment:
       With this implementation at every incoming heartbeat we are sending an 
update operation to the agent so the agent has to download the flow after each 
heartbeat. Was this the intention? Maybe we would need to track that which 
agent downloaded (acknowledged) the flow already and issue update operation 
only for the agents who has not downloaded it yet.

##########
File path: 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.service;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+@Produces(MediaType.APPLICATION_JSON)
+public class C2JsonProvider extends JacksonJaxbJsonProvider {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    static {
+        // The Include.ALWAYS is for Jackson's "content" include which affects 
Maps and referential types,
+        // and we need to return null Map values to the front-end for the Map 
of property values so we use ALWAYS
+        
objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL,
 JsonInclude.Include.ALWAYS));
+        objectMapper.setAnnotationIntrospector(new 
JaxbAnnotationIntrospector(objectMapper.getTypeFactory()));
+        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);

Review comment:
       I think we only need this one the other two config lines and the comment 
doesn't apply here

##########
File path: 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
##########
@@ -147,6 +180,140 @@ protected ConfigurationProviderInfo 
initContentTypeInfo(List<ConfigurationProvid
         return new ConfigurationProviderInfo(mediaTypeList, contentTypes, 
null);
     }
 
+    @POST
+    @Path("/heartbeat")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send a heartbeat to the 
C2 server",
+            response = C2HeartbeatResponse.class
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response heartbeat(
+            @Context HttpServletRequest request, @Context HttpHeaders 
httpHeaders, @Context UriInfo uriInfo,
+            @ApiParam(required = true) final C2Heartbeat heartbeat) {
+
+        logRequestEntry(heartbeat, heartbeat.getAgentId(), 
heartbeat.getIdentifier());
+
+        try {
+            
authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), 
uriInfo);
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        }
+
+        List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
+        boolean defaultAccept = false;
+        if (acceptValues.size() == 0) {
+            acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
+            defaultAccept = true;
+        }
+        if (logger.isDebugEnabled()) {
+            StringBuilder builder = new StringBuilder("Handling request from ")
+                    .append(HttpRequestUtil.getClientString(request))
+                    .append(" with Accept");
+            if (defaultAccept) {
+                builder.append(" default value");
+            }
+            builder.append(": ")
+                    
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+            logger.debug(builder.toString());
+        }
+
+        try {
+            Map<String, List<String>> parameters = 
Collections.singletonMap("class", 
Collections.singletonList(heartbeat.getAgentClass()));
+            ConfigurationProviderValue configurationProviderValue = 
configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
+            org.apache.nifi.minifi.c2.api.Configuration configuration = 
configurationProviderValue.getConfiguration();
+            final C2ProtocolContext heartbeatContext = 
C2ProtocolContext.builder()
+                    .baseUri(configuration.getURL().toURI())
+                    
.contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                    .build();
+
+            Response response;
+
+            try {
+                final C2HeartbeatResponse heartbeatResponse = 
c2ProtocolService.processHeartbeat(heartbeat, heartbeatContext);
+                response = Response.ok(heartbeatResponse).build();
+            } catch (Exception e) {
+                logger.error("Heartbeat processing failed", e);
+                response = 
Response.status(BAD_REQUEST).entity(e.getMessage()).build();
+            }
+            logRequestProcessingFinished(heartbeat.getAgentId(), 
heartbeat.getIdentifier());
+            return response;
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        } catch (InvalidParameterException e) {
+            logger.info(HttpRequestUtil.getClientString(request) + " made 
invalid request with " + HttpRequestUtil.getQueryString(request), e);
+            return Response.status(400).entity("Invalid request.").build();
+        } catch (ConfigurationProviderException | URISyntaxException e) {
+            logger.warn("Unable to get configuration.", e);
+            return Response.status(500).build();
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException) {
+                throw (WebApplicationException) cause;
+            }
+            logger.error(HttpRequestUtil.getClientString(request) + " made 
request with " + HttpRequestUtil.getQueryString(request) + " that caused 
error.", cause);
+            return Response.status(500).entity("Internal error").build();
+        }
+    }
+
+    @POST
+    @Path("/acknowledge")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send an operation 
acknowledgement to the C2 server"
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response acknowledge(
+            @ApiParam(required = true) final C2OperationAck operationAck) {
+
+        Optional<String> agentId = getAgentId(operationAck);
+
+        agentId.ifPresent(id -> logRequestEntry(operationAck, id, 
operationAck.getOperationId()));
+
+        final C2ProtocolContext ackContext = C2ProtocolContext.builder()
+                .baseUri(getBaseUri())
+                .contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                .build();
+
+        c2ProtocolService.processOperationAck(operationAck, ackContext);
+
+        agentId.ifPresent(id -> logRequestProcessingFinished(id, 
operationAck.getOperationId()));
+
+        return Response.ok().build();
+
+    }
+
+    private Optional<String> getAgentId(C2OperationAck operationAck) {
+        Optional<String> agentId;

Review comment:
       You could replace the whole body of this function with
   ```
   return 
Optional.ofNullable(operationAck.getAgentInfo()).map(AgentInfo::getIdentifier);
   ```
   Those should be equivalent and it is more fluent this way in my opinion. 
What do you think?

##########
File path: 
minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml
##########
@@ -37,3 +37,31 @@ Paths:
     # Default authorization lets anonymous pull any config.  Remove below to 
change that.
     - Authorization: ROLE_ANONYMOUS
       Action: allow
+
+  /c2/config/heartbeat:
+    Default Action: deny
+    Actions:
+      - Authorization: CLASS_RASPI_3
+        Query Parameters:
+          class: raspi3
+        Action: allow
+      - Authorization: ROLE_SUPERUSER
+        Action: allow
+
+      # Default authorization lets anonymous pull any config.  Remove below to 
change that.

Review comment:
       I suppose it is just copy paste. I don't think "config" has any meaning 
for these endpoints.

##########
File path: 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
##########
@@ -147,6 +180,140 @@ protected ConfigurationProviderInfo 
initContentTypeInfo(List<ConfigurationProvid
         return new ConfigurationProviderInfo(mediaTypeList, contentTypes, 
null);
     }
 
+    @POST
+    @Path("/heartbeat")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send a heartbeat to the 
C2 server",
+            response = C2HeartbeatResponse.class
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response heartbeat(
+            @Context HttpServletRequest request, @Context HttpHeaders 
httpHeaders, @Context UriInfo uriInfo,
+            @ApiParam(required = true) final C2Heartbeat heartbeat) {
+
+        logRequestEntry(heartbeat, heartbeat.getAgentId(), 
heartbeat.getIdentifier());
+
+        try {
+            
authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), 
uriInfo);
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        }
+
+        List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
+        boolean defaultAccept = false;
+        if (acceptValues.size() == 0) {
+            acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
+            defaultAccept = true;
+        }
+        if (logger.isDebugEnabled()) {
+            StringBuilder builder = new StringBuilder("Handling request from ")
+                    .append(HttpRequestUtil.getClientString(request))
+                    .append(" with Accept");
+            if (defaultAccept) {
+                builder.append(" default value");
+            }
+            builder.append(": ")
+                    
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+            logger.debug(builder.toString());
+        }
+
+        try {
+            Map<String, List<String>> parameters = 
Collections.singletonMap("class", 
Collections.singletonList(heartbeat.getAgentClass()));
+            ConfigurationProviderValue configurationProviderValue = 
configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
+            org.apache.nifi.minifi.c2.api.Configuration configuration = 
configurationProviderValue.getConfiguration();
+            final C2ProtocolContext heartbeatContext = 
C2ProtocolContext.builder()
+                    .baseUri(configuration.getURL().toURI())
+                    
.contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                    .build();
+
+            Response response;
+
+            try {
+                final C2HeartbeatResponse heartbeatResponse = 
c2ProtocolService.processHeartbeat(heartbeat, heartbeatContext);
+                response = Response.ok(heartbeatResponse).build();
+            } catch (Exception e) {
+                logger.error("Heartbeat processing failed", e);
+                response = 
Response.status(BAD_REQUEST).entity(e.getMessage()).build();
+            }
+            logRequestProcessingFinished(heartbeat.getAgentId(), 
heartbeat.getIdentifier());
+            return response;
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        } catch (InvalidParameterException e) {
+            logger.info(HttpRequestUtil.getClientString(request) + " made 
invalid request with " + HttpRequestUtil.getQueryString(request), e);
+            return Response.status(400).entity("Invalid request.").build();
+        } catch (ConfigurationProviderException | URISyntaxException e) {
+            logger.warn("Unable to get configuration.", e);
+            return Response.status(500).build();
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException) {
+                throw (WebApplicationException) cause;
+            }
+            logger.error(HttpRequestUtil.getClientString(request) + " made 
request with " + HttpRequestUtil.getQueryString(request) + " that caused 
error.", cause);
+            return Response.status(500).entity("Internal error").build();
+        }
+    }
+
+    @POST
+    @Path("/acknowledge")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send an operation 
acknowledgement to the C2 server"
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response acknowledge(
+            @ApiParam(required = true) final C2OperationAck operationAck) {
+
+        Optional<String> agentId = getAgentId(operationAck);
+
+        agentId.ifPresent(id -> logRequestEntry(operationAck, id, 
operationAck.getOperationId()));
+
+        final C2ProtocolContext ackContext = C2ProtocolContext.builder()
+                .baseUri(getBaseUri())
+                .contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                .build();
+
+        c2ProtocolService.processOperationAck(operationAck, ackContext);
+
+        agentId.ifPresent(id -> logRequestProcessingFinished(id, 
operationAck.getOperationId()));
+
+        return Response.ok().build();
+
+    }
+
+    private Optional<String> getAgentId(C2OperationAck operationAck) {
+        Optional<String> agentId;
+        if (operationAck.getAgentInfo() != null) {
+            agentId = 
Optional.ofNullable(operationAck.getAgentInfo().getIdentifier());
+        } else {
+            agentId = Optional.empty();
+        }
+
+        return agentId;
+    }
+
+    private void logRequestEntry(Serializable request, String agentId, String 
requestId) {
+        Marker marker = getMarker(agentId);
+        logger.debug(marker, "Incoming request from agent [{}] with request id 
[{}]", agentId, requestId);

Review comment:
       I think all of the logic around markers could be removed. As there is no 
TurboFilter defined anywhere with proper MatchingFilter logic all of this code 
is just dead code as it is not utilised.

##########
File path: 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/util/LogMarkerUtil.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.util;
+
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+public class LogMarkerUtil {

Review comment:
       As mentioned above all of this could be removed in my opinion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to