DaanHoogland commented on code in PR #8674: URL: https://github.com/apache/cloudstack/pull/8674#discussion_r1522952722
########## framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.events; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.StringUtils; + +import com.cloud.utils.component.ManagerBase; + +public class EventDistributorImpl extends ManagerBase implements EventDistributor { + + List<EventBus> eventBuses; + + public void setEventBuses(List<EventBus> eventBuses) { + this.eventBuses = eventBuses; + } + + @PostConstruct + public void init() { + if (logger.isTraceEnabled()) { + logger.trace("Found {} event buses : {}", eventBuses.size(), + StringUtils.join(eventBuses.stream().map(x->x.getClass().getName()).toArray())); + } + } + + @Override + public Map<String, EventBusException> publish(Event event) { + Map<String, EventBusException> exceptions = new HashMap<>(); + if (event == null) { + return exceptions; + } + if (logger.isTraceEnabled()) { + logger.trace("Publishing event [category: {}, type: {}]: {} to {} event buses", + event.getEventCategory(), event.getEventType(), + event.getDescription(), eventBuses.size()); + } Review Comment: ```suggestion logger.trace("Publishing event [category: {}, type: {}]: {} to {} event buses", event.getEventCategory(), event.getEventType(), event.getDescription(), eventBuses.size()); ``` ########## framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.events; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.StringUtils; + +import com.cloud.utils.component.ManagerBase; + +public class EventDistributorImpl extends ManagerBase implements EventDistributor { + + List<EventBus> eventBuses; + + public void setEventBuses(List<EventBus> eventBuses) { + this.eventBuses = eventBuses; + } + + @PostConstruct + public void init() { + if (logger.isTraceEnabled()) { Review Comment: this is the logger from `ComponentLifecycleBase`. Is that intentional? ########## plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java: ########## @@ -68,6 +72,9 @@ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws Event @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("unsubscribing '{}'", subscriberId); + } Review Comment: ```suggestion logger.debug("unsubscribing '{}'", subscriberId); ``` ########## plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java: ########## @@ -185,11 +185,14 @@ public static void setRetryInterval(Integer retryInterval) { */ @Override public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { - if (subscriber == null || topic == null) { throw new EventBusException("Invalid EventSubscriber/EventTopic object passed."); } + if (logger.isDebugEnabled()) { + logger.debug("subscribing '{}' to events of type '{}' from '{}'", subscriber.toString(), topic.getEventType(), topic.getEventSource()); + } Review Comment: ```suggestion logger.debug("subscribing '{}' to events of type '{}' from '{}'", () -> subscriber.toString(), () -> topic.getEventType(), () -> topic.getEventSource()); ``` ########## plugins/event-bus/webhook/src/main/java/org/apache/cloudstack/mom/webhook/WebhookDeliveryThread.java: ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.mom.webhook; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.cloudstack.framework.async.AsyncCompletionCallback; +import org.apache.cloudstack.framework.async.AsyncRpcContext; +import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.storage.command.CommandResult; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public class WebhookDeliveryThread implements Runnable { + protected static Logger LOGGER = LogManager.getLogger(WebhookDeliveryThread.class); + + private static final String HEADER_X_CS_EVENT_ID = "X-CS-Event-ID"; + private static final String HEADER_X_CS_EVENT = "X-CS-Event"; + private static final String HEADER_X_CS_SIGNATURE = "X-CS-Signature"; + private static final String PREFIX_HEADER_USER_AGENT = "CS-Hookshot/"; + private final Webhook webhook; + private final Event event; + private CloseableHttpClient httpClient; + private String headers; + private String payload; + private String response; + private Date startTime; + private int deliveryRetries = 3; + private int deliveryTimeout = 10; + + AsyncCompletionCallback<WebhookDeliveryResult> callback; + + protected boolean isValidJson(String json) { + try { + new JSONObject(json); + } catch (JSONException ex) { + try { + new JSONArray(json); + } catch (JSONException ex1) { + return false; + } + } + return true; + } + + protected void setHttpClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + if (webhook.isSslVerification()) { + httpClient = HttpClients.createDefault(); + return; + } + httpClient = HttpClients + .custom() + .setSSLContext(new SSLContextBuilder().loadTrustMaterial(null, + TrustAllStrategy.INSTANCE).build()) + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .build(); + } + + protected HttpPost getBasicHttpPostRequest() throws URISyntaxException { + final URI uri = new URI(webhook.getPayloadUrl()); + HttpPost request = new HttpPost(); + RequestConfig.Builder requestConfig = RequestConfig.custom(); + requestConfig.setConnectTimeout(deliveryTimeout * 1000); + requestConfig.setConnectionRequestTimeout(deliveryTimeout * 1000); + requestConfig.setSocketTimeout(deliveryTimeout * 1000); + request.setConfig(requestConfig.build()); + request.setURI(uri); + return request; + } + + protected void updateRequestHeaders(HttpPost request) throws DecoderException, NoSuchAlgorithmException, + InvalidKeyException { + request.addHeader(HEADER_X_CS_EVENT_ID, event.getEventUuid()); + request.addHeader(HEADER_X_CS_EVENT, event.getEventType()); + request.setHeader(HttpHeaders.USER_AGENT, String.format("%s%s", PREFIX_HEADER_USER_AGENT, + event.getResourceAccountUuid())); + if (StringUtils.isNotBlank(webhook.getSecretKey())) { + request.addHeader(HEADER_X_CS_SIGNATURE, generateHMACSignature(payload, webhook.getSecretKey())); + } + List<Header> headers = new ArrayList<>(Arrays.asList(request.getAllHeaders())); + HttpEntity entity = request.getEntity(); + if (entity.getContentLength() > 0 && !request.containsHeader(HttpHeaders.CONTENT_LENGTH)) { + headers.add(new BasicHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(entity.getContentLength()))); + } + if (entity.getContentType() != null && !request.containsHeader(HttpHeaders.CONTENT_TYPE)) { + headers.add(entity.getContentType()); + } + if (entity.getContentEncoding() != null && !request.containsHeader(HttpHeaders.CONTENT_ENCODING)) { + headers.add(entity.getContentEncoding()); + } + this.headers = StringUtils.join(headers, "\n"); + } + + public WebhookDeliveryThread(Webhook webhook, Event event, + AsyncCompletionCallback<WebhookDeliveryResult> callback) { + this.webhook = webhook; + this.event = event; + this.callback = callback; + } + + public void setDeliveryRetries(int deliveryRetries) { + this.deliveryRetries = deliveryRetries; + } + + public void setDeliveryTimeout(int deliveryTimeout) { + this.deliveryTimeout = deliveryTimeout; + } + + @Override + public void run() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Delivering event: {} for {}", event.getEventType(), webhook); + } Review Comment: ```suggestion LOGGER.debug("Delivering event: {} for {}", event.getEventType(), webhook); ``` or ```suggestion LOGGER.debug("Delivering event: {} for {}", () -> event.getEventType(), () -> webhook.toString()); ``` depending on what kind of `Object` webhook is. I haven't found a `toString()` for it yet. ########## plugins/event-bus/webhook/src/main/java/org/apache/cloudstack/mom/webhook/WebhookDeliveryThread.java: ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.mom.webhook; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.cloudstack.framework.async.AsyncCompletionCallback; +import org.apache.cloudstack.framework.async.AsyncRpcContext; +import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.storage.command.CommandResult; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public class WebhookDeliveryThread implements Runnable { + protected static Logger LOGGER = LogManager.getLogger(WebhookDeliveryThread.class); + + private static final String HEADER_X_CS_EVENT_ID = "X-CS-Event-ID"; + private static final String HEADER_X_CS_EVENT = "X-CS-Event"; + private static final String HEADER_X_CS_SIGNATURE = "X-CS-Signature"; + private static final String PREFIX_HEADER_USER_AGENT = "CS-Hookshot/"; + private final Webhook webhook; + private final Event event; + private CloseableHttpClient httpClient; + private String headers; + private String payload; + private String response; + private Date startTime; + private int deliveryRetries = 3; + private int deliveryTimeout = 10; + + AsyncCompletionCallback<WebhookDeliveryResult> callback; + + protected boolean isValidJson(String json) { + try { + new JSONObject(json); + } catch (JSONException ex) { + try { + new JSONArray(json); + } catch (JSONException ex1) { + return false; + } + } + return true; + } + + protected void setHttpClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + if (webhook.isSslVerification()) { + httpClient = HttpClients.createDefault(); + return; + } + httpClient = HttpClients + .custom() + .setSSLContext(new SSLContextBuilder().loadTrustMaterial(null, + TrustAllStrategy.INSTANCE).build()) + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .build(); + } + + protected HttpPost getBasicHttpPostRequest() throws URISyntaxException { + final URI uri = new URI(webhook.getPayloadUrl()); + HttpPost request = new HttpPost(); + RequestConfig.Builder requestConfig = RequestConfig.custom(); + requestConfig.setConnectTimeout(deliveryTimeout * 1000); + requestConfig.setConnectionRequestTimeout(deliveryTimeout * 1000); + requestConfig.setSocketTimeout(deliveryTimeout * 1000); + request.setConfig(requestConfig.build()); + request.setURI(uri); + return request; + } + + protected void updateRequestHeaders(HttpPost request) throws DecoderException, NoSuchAlgorithmException, + InvalidKeyException { + request.addHeader(HEADER_X_CS_EVENT_ID, event.getEventUuid()); + request.addHeader(HEADER_X_CS_EVENT, event.getEventType()); + request.setHeader(HttpHeaders.USER_AGENT, String.format("%s%s", PREFIX_HEADER_USER_AGENT, + event.getResourceAccountUuid())); + if (StringUtils.isNotBlank(webhook.getSecretKey())) { + request.addHeader(HEADER_X_CS_SIGNATURE, generateHMACSignature(payload, webhook.getSecretKey())); + } + List<Header> headers = new ArrayList<>(Arrays.asList(request.getAllHeaders())); + HttpEntity entity = request.getEntity(); + if (entity.getContentLength() > 0 && !request.containsHeader(HttpHeaders.CONTENT_LENGTH)) { + headers.add(new BasicHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(entity.getContentLength()))); + } + if (entity.getContentType() != null && !request.containsHeader(HttpHeaders.CONTENT_TYPE)) { + headers.add(entity.getContentType()); + } + if (entity.getContentEncoding() != null && !request.containsHeader(HttpHeaders.CONTENT_ENCODING)) { + headers.add(entity.getContentEncoding()); + } + this.headers = StringUtils.join(headers, "\n"); + } + + public WebhookDeliveryThread(Webhook webhook, Event event, + AsyncCompletionCallback<WebhookDeliveryResult> callback) { + this.webhook = webhook; + this.event = event; + this.callback = callback; + } + + public void setDeliveryRetries(int deliveryRetries) { + this.deliveryRetries = deliveryRetries; + } + + public void setDeliveryTimeout(int deliveryTimeout) { + this.deliveryTimeout = deliveryTimeout; + } + + @Override + public void run() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Delivering event: {} for {}", event.getEventType(), webhook); + } + if (event == null) { + LOGGER.warn("Invalid event received for delivering to {}", webhook); + return; + } + payload = event.getDescription(); + int attempt = 0; + boolean success = false; + try { + setHttpClient(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { + response = String.format("Failed to initiate delivery due to : %s", e.getMessage()); + callback.complete(new WebhookDeliveryResult(headers, payload, success, response, new Date())); + return; + } + while (attempt < deliveryRetries) { + attempt++; + if (delivery(attempt)) { + success = true; + break; + } + } + callback.complete(new WebhookDeliveryResult(headers, payload, success, response, startTime)); + } + + protected void updateResponseFromRequest(HttpEntity entity) { + try { + this.response = EntityUtils.toString(entity, StandardCharsets.UTF_8); + } catch (IOException e) { + LOGGER.error("Failed to parse response for event: {} for {}", + event.getEventType(), webhook); + this.response = ""; + } + } + + protected boolean delivery(int attempt) { + startTime = new Date(); + try { + HttpPost request = getBasicHttpPostRequest(); + StringEntity input = new StringEntity(payload, + isValidJson(payload) ? ContentType.APPLICATION_JSON : ContentType.TEXT_PLAIN); + request.setEntity(input); + updateRequestHeaders(request); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Delivering event: {} for {} with timeout: {}, " + + "attempt #{}", event.getEventType(), webhook, + deliveryTimeout, attempt); + } Review Comment: ```suggestion LOGGER.trace("Delivering event: {} for {} with timeout: {}, attempt #{}", event.getEventType(), webhook, deliveryTimeout, attempt); ``` ########## framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.events; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.StringUtils; + +import com.cloud.utils.component.ManagerBase; + +public class EventDistributorImpl extends ManagerBase implements EventDistributor { + + List<EventBus> eventBuses; + + public void setEventBuses(List<EventBus> eventBuses) { + this.eventBuses = eventBuses; + } + + @PostConstruct + public void init() { + if (logger.isTraceEnabled()) { + logger.trace("Found {} event buses : {}", eventBuses.size(), + StringUtils.join(eventBuses.stream().map(x->x.getClass().getName()).toArray())); + } Review Comment: ```suggestion logger.trace("Found {} event buses : {}", () -> eventBuses.size(), () -> StringUtils.join(eventBuses.stream().map(x->x.getClass().getName()).toArray())); ``` ########## plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java: ########## @@ -250,6 +253,9 @@ public void handleDelivery(String queueName, Envelope envelope, AMQP.BasicProper @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("unsubscribing '{}'", subscriberId); + } Review Comment: ```suggestion logger.debug("unsubscribing '{}'", subscriberId); ``` ########## plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java: ########## @@ -60,6 +60,10 @@ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws Event if (subscriber == null || topic == null) { throw new EventBusException("Invalid EventSubscriber/EventTopic object passed."); } + if (logger.isDebugEnabled()) { + logger.debug("subscribing '{}' to events of type '{}' from '{}'", subscriber.toString(), topic.getEventType(), topic.getEventSource()); + } Review Comment: ```suggestion logger.debug("subscribing '{}' to events of type '{}' from '{}'", () -> subscriber.toString(), () -> topic.getEventType(), () -> topic.getEventSource()); ``` ########## plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java: ########## @@ -87,19 +87,29 @@ public void setName(String name) { @Override public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("subscribing '{}' to events of type '{}' from '{}'", subscriber.toString(), topic.getEventType(), topic.getEventSource()); + } + /* NOOP */ return UUID.randomUUID(); } @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("unsubscribing '{}'", subscriberId); + } /* NOOP */ } @Override public void publish(Event event) throws EventBusException { - ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription()); - _producer.send(record); + if (logger.isTraceEnabled()) { + logger.trace("publish '{}'", event.getDescription()); + } Review Comment: ```suggestion logger.trace("publish '{}'", event.getDescription()); ``` ########## plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java: ########## @@ -87,19 +87,29 @@ public void setName(String name) { @Override public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("subscribing '{}' to events of type '{}' from '{}'", subscriber.toString(), topic.getEventType(), topic.getEventSource()); + } + /* NOOP */ return UUID.randomUUID(); } @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("unsubscribing '{}'", subscriberId); + } Review Comment: ```suggestion logger.debug("unsubscribing '{}'", subscriberId); ``` ########## plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java: ########## @@ -87,19 +87,29 @@ public void setName(String name) { @Override public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + if (logger.isDebugEnabled()) { + logger.debug("subscribing '{}' to events of type '{}' from '{}'", subscriber.toString(), topic.getEventType(), topic.getEventSource()); + } Review Comment: ```suggestion logger.debug("subscribing '{}' to events of type '{}' from '{}'", () -> subscriber.toString(), () -> topic.getEventType(), () -> topic.getEventSource()); ``` ########## plugins/event-bus/webhook/src/main/java/org/apache/cloudstack/mom/webhook/WebhookApiServiceImpl.java: ########## @@ -0,0 +1,533 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.mom.webhook; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import javax.inject.Inject; + +import org.apache.cloudstack.acl.SecurityChecker; +import org.apache.cloudstack.api.ApiConstants; +import org.apache.cloudstack.api.response.ListResponse; +import org.apache.cloudstack.context.CallContext; +import org.apache.cloudstack.mom.webhook.api.command.user.CreateWebhookCmd; +import org.apache.cloudstack.mom.webhook.api.command.user.DeleteWebhookCmd; +import org.apache.cloudstack.mom.webhook.api.command.user.DeleteWebhookDeliveryCmd; +import org.apache.cloudstack.mom.webhook.api.command.user.ExecuteWebhookDeliveryCmd; +import org.apache.cloudstack.mom.webhook.api.command.user.ListWebhookDeliveriesCmd; +import org.apache.cloudstack.mom.webhook.api.command.user.ListWebhooksCmd; +import org.apache.cloudstack.mom.webhook.api.command.user.UpdateWebhookCmd; +import org.apache.cloudstack.mom.webhook.api.response.WebhookDeliveryResponse; +import org.apache.cloudstack.mom.webhook.api.response.WebhookResponse; +import org.apache.cloudstack.mom.webhook.dao.WebhookDao; +import org.apache.cloudstack.mom.webhook.dao.WebhookDeliveryDao; +import org.apache.cloudstack.mom.webhook.dao.WebhookDeliveryJoinDao; +import org.apache.cloudstack.mom.webhook.dao.WebhookJoinDao; +import org.apache.cloudstack.mom.webhook.vo.WebhookDeliveryJoinVO; +import org.apache.cloudstack.mom.webhook.vo.WebhookDeliveryVO; +import org.apache.cloudstack.mom.webhook.vo.WebhookJoinVO; +import org.apache.cloudstack.mom.webhook.vo.WebhookVO; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; + +import com.cloud.api.ApiResponseHelper; +import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.cluster.dao.ManagementServerHostDao; +import com.cloud.domain.Domain; +import com.cloud.domain.dao.DomainDao; +import com.cloud.exception.InvalidParameterValueException; +import com.cloud.exception.PermissionDeniedException; +import com.cloud.projects.Project; +import com.cloud.user.Account; +import com.cloud.user.AccountManager; +import com.cloud.utils.Pair; +import com.cloud.utils.Ternary; +import com.cloud.utils.UriUtils; +import com.cloud.utils.component.ManagerBase; +import com.cloud.utils.db.Filter; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.utils.rest.HttpConstants; + +public class WebhookApiServiceImpl extends ManagerBase implements WebhookApiService { + + @Inject + AccountManager accountManager; + @Inject + DomainDao domainDao; + @Inject + WebhookDao webhookDao; + @Inject + WebhookJoinDao webhookJoinDao; + @Inject + WebhookDeliveryDao webhookDeliveryDao; + @Inject + WebhookDeliveryJoinDao webhookDeliveryJoinDao; + @Inject + ManagementServerHostDao managementServerHostDao; + @Inject + WebhookService webhookService; + + protected WebhookResponse createWebhookResponse(WebhookJoinVO webhookVO) { + WebhookResponse response = new WebhookResponse(); + response.setObjectName("webhook"); + response.setId(webhookVO.getUuid()); + response.setName(webhookVO.getName()); + response.setDescription(webhookVO.getDescription()); + ApiResponseHelper.populateOwner(response, webhookVO); + response.setState(webhookVO.getState().toString()); + response.setPayloadUrl(webhookVO.getPayloadUrl()); + response.setSecretKey(webhookVO.getSecretKey()); + response.setSslVerification(webhookVO.isSslVerification()); + response.setScope(webhookVO.getScope().toString()); + response.setCreated(webhookVO.getCreated()); + return response; + } + + protected List<Long> getIdsOfAccessibleWebhooks(Account caller) { + if (Account.Type.ADMIN.equals(caller.getType())) { + return new ArrayList<>(); + } + String domainPath = null; + if (Account.Type.DOMAIN_ADMIN.equals(caller.getType())) { + Domain domain = domainDao.findById(caller.getDomainId()); + domainPath = domain.getPath(); + } + List<WebhookJoinVO> webhooks = webhookJoinDao.listByAccountOrDomain(caller.getId(), domainPath); + return webhooks.stream().map(WebhookJoinVO::getId).collect(Collectors.toList()); + } + + protected ManagementServerHostVO basicWebhookDeliveryApiCheck(Account caller, final Long id, final Long webhookId, Review Comment: this seems to only be called internally. Can it be ```suggestion private ManagementServerHostVO basicWebhookDeliveryApiCheck(Account caller, final Long id, final Long webhookId, ``` ########## plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java: ########## @@ -265,6 +271,9 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev // publish event on to the exchange created on AMQP server @Override public void publish(Event event) throws EventBusException { + if (logger.isTraceEnabled()) { + logger.trace("publish '{}'", event.getDescription()); + } Review Comment: ```suggestion logger.trace("publish '{}'", event.getDescription()); ``` ########## plugins/event-bus/webhook/src/main/java/org/apache/cloudstack/mom/webhook/api/command/user/ListWebhooksCmd.java: ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.mom.webhook.api.command.user; + + +import javax.inject.Inject; + +import org.apache.cloudstack.acl.RoleType; +import org.apache.cloudstack.api.APICommand; +import org.apache.cloudstack.api.ApiConstants; +import org.apache.cloudstack.api.BaseListProjectAndAccountResourcesCmd; +import org.apache.cloudstack.api.Parameter; +import org.apache.cloudstack.api.ResponseObject; +import org.apache.cloudstack.api.ServerApiException; +import org.apache.cloudstack.api.response.ListResponse; +import org.apache.cloudstack.mom.webhook.WebhookApiService; +import org.apache.cloudstack.mom.webhook.Webhook; +import org.apache.cloudstack.mom.webhook.api.response.WebhookResponse; + +@APICommand(name = "listWebhooks", + description = "Lists Webhooks", + responseObject = WebhookResponse.class, + responseView = ResponseObject.ResponseView.Restricted, + entityType = {Webhook.class}, + authorized = {RoleType.Admin, RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User}, + since = "4.20.0") +public class ListWebhooksCmd extends BaseListProjectAndAccountResourcesCmd { + + @Inject + WebhookApiService webhookApiService; + + ///////////////////////////////////////////////////// + //////////////// API parameters ///////////////////// + ///////////////////////////////////////////////////// + @Parameter(name = ApiConstants.ID, type = CommandType.UUID, + entityType = WebhookResponse.class, + description = "The ID of the Webhooks") Review Comment: Is this the single id of multiple webhooks? seems lika an id would be of a webhook, not webhook**s** ########## server/src/main/java/com/cloud/projects/ProjectManagerImpl.java: ########## @@ -163,6 +168,19 @@ public class ProjectManagerImpl extends ManagerBase implements ProjectManager, C private String senderAddress; protected SMTPMailSender mailSender; + protected List<? extends ControlledEntity> listWebhooksForProject(Project project) { + List<? extends ControlledEntity> webhooks = new ArrayList<>(); + try { + WebhookHelper webhookService = ComponentContext.getDelegateComponentOfType(WebhookHelper.class); + webhooks = webhookService.listWebhooksByAccount(project.getProjectAccountId()); + } catch (NoSuchBeanDefinitionException ignored) { + if (logger.isDebugEnabled()) { + logger.debug("No WebhookHelper bean found"); + } Review Comment: ```suggestion logger.debug("No WebhookHelper bean found"); ``` ########## plugins/event-bus/webhook/src/main/java/org/apache/cloudstack/mom/webhook/api/command/user/ListWebhooksCmd.java: ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.mom.webhook.api.command.user; + + +import javax.inject.Inject; + +import org.apache.cloudstack.acl.RoleType; +import org.apache.cloudstack.api.APICommand; +import org.apache.cloudstack.api.ApiConstants; +import org.apache.cloudstack.api.BaseListProjectAndAccountResourcesCmd; +import org.apache.cloudstack.api.Parameter; +import org.apache.cloudstack.api.ResponseObject; +import org.apache.cloudstack.api.ServerApiException; +import org.apache.cloudstack.api.response.ListResponse; +import org.apache.cloudstack.mom.webhook.WebhookApiService; +import org.apache.cloudstack.mom.webhook.Webhook; +import org.apache.cloudstack.mom.webhook.api.response.WebhookResponse; + +@APICommand(name = "listWebhooks", + description = "Lists Webhooks", + responseObject = WebhookResponse.class, + responseView = ResponseObject.ResponseView.Restricted, + entityType = {Webhook.class}, + authorized = {RoleType.Admin, RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User}, + since = "4.20.0") +public class ListWebhooksCmd extends BaseListProjectAndAccountResourcesCmd { + + @Inject + WebhookApiService webhookApiService; + + ///////////////////////////////////////////////////// + //////////////// API parameters ///////////////////// + ///////////////////////////////////////////////////// + @Parameter(name = ApiConstants.ID, type = CommandType.UUID, + entityType = WebhookResponse.class, + description = "The ID of the Webhooks") Review Comment: ```suggestion @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = WebhookResponse.class, description = "The ID of the Webhook") ``` ########## plugins/event-bus/webhook/src/main/java/org/apache/cloudstack/mom/webhook/WebhookDeliveryThread.java: ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.mom.webhook; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.cloudstack.framework.async.AsyncCompletionCallback; +import org.apache.cloudstack.framework.async.AsyncRpcContext; +import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.storage.command.CommandResult; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public class WebhookDeliveryThread implements Runnable { + protected static Logger LOGGER = LogManager.getLogger(WebhookDeliveryThread.class); + + private static final String HEADER_X_CS_EVENT_ID = "X-CS-Event-ID"; + private static final String HEADER_X_CS_EVENT = "X-CS-Event"; + private static final String HEADER_X_CS_SIGNATURE = "X-CS-Signature"; + private static final String PREFIX_HEADER_USER_AGENT = "CS-Hookshot/"; + private final Webhook webhook; + private final Event event; + private CloseableHttpClient httpClient; + private String headers; + private String payload; + private String response; + private Date startTime; + private int deliveryRetries = 3; + private int deliveryTimeout = 10; + + AsyncCompletionCallback<WebhookDeliveryResult> callback; + + protected boolean isValidJson(String json) { + try { + new JSONObject(json); + } catch (JSONException ex) { + try { + new JSONArray(json); + } catch (JSONException ex1) { + return false; + } + } + return true; + } + + protected void setHttpClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + if (webhook.isSslVerification()) { + httpClient = HttpClients.createDefault(); + return; + } + httpClient = HttpClients + .custom() + .setSSLContext(new SSLContextBuilder().loadTrustMaterial(null, + TrustAllStrategy.INSTANCE).build()) + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .build(); + } + + protected HttpPost getBasicHttpPostRequest() throws URISyntaxException { + final URI uri = new URI(webhook.getPayloadUrl()); + HttpPost request = new HttpPost(); + RequestConfig.Builder requestConfig = RequestConfig.custom(); + requestConfig.setConnectTimeout(deliveryTimeout * 1000); + requestConfig.setConnectionRequestTimeout(deliveryTimeout * 1000); + requestConfig.setSocketTimeout(deliveryTimeout * 1000); + request.setConfig(requestConfig.build()); + request.setURI(uri); + return request; + } + + protected void updateRequestHeaders(HttpPost request) throws DecoderException, NoSuchAlgorithmException, + InvalidKeyException { + request.addHeader(HEADER_X_CS_EVENT_ID, event.getEventUuid()); + request.addHeader(HEADER_X_CS_EVENT, event.getEventType()); + request.setHeader(HttpHeaders.USER_AGENT, String.format("%s%s", PREFIX_HEADER_USER_AGENT, + event.getResourceAccountUuid())); + if (StringUtils.isNotBlank(webhook.getSecretKey())) { + request.addHeader(HEADER_X_CS_SIGNATURE, generateHMACSignature(payload, webhook.getSecretKey())); + } + List<Header> headers = new ArrayList<>(Arrays.asList(request.getAllHeaders())); + HttpEntity entity = request.getEntity(); + if (entity.getContentLength() > 0 && !request.containsHeader(HttpHeaders.CONTENT_LENGTH)) { + headers.add(new BasicHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(entity.getContentLength()))); + } + if (entity.getContentType() != null && !request.containsHeader(HttpHeaders.CONTENT_TYPE)) { + headers.add(entity.getContentType()); + } + if (entity.getContentEncoding() != null && !request.containsHeader(HttpHeaders.CONTENT_ENCODING)) { + headers.add(entity.getContentEncoding()); + } + this.headers = StringUtils.join(headers, "\n"); + } + + public WebhookDeliveryThread(Webhook webhook, Event event, + AsyncCompletionCallback<WebhookDeliveryResult> callback) { + this.webhook = webhook; + this.event = event; + this.callback = callback; + } + + public void setDeliveryRetries(int deliveryRetries) { + this.deliveryRetries = deliveryRetries; + } + + public void setDeliveryTimeout(int deliveryTimeout) { + this.deliveryTimeout = deliveryTimeout; + } + + @Override + public void run() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Delivering event: {} for {}", event.getEventType(), webhook); + } + if (event == null) { + LOGGER.warn("Invalid event received for delivering to {}", webhook); + return; + } + payload = event.getDescription(); + int attempt = 0; + boolean success = false; + try { + setHttpClient(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { + response = String.format("Failed to initiate delivery due to : %s", e.getMessage()); + callback.complete(new WebhookDeliveryResult(headers, payload, success, response, new Date())); + return; + } + while (attempt < deliveryRetries) { + attempt++; + if (delivery(attempt)) { + success = true; + break; + } + } + callback.complete(new WebhookDeliveryResult(headers, payload, success, response, startTime)); + } + + protected void updateResponseFromRequest(HttpEntity entity) { + try { + this.response = EntityUtils.toString(entity, StandardCharsets.UTF_8); + } catch (IOException e) { + LOGGER.error("Failed to parse response for event: {} for {}", + event.getEventType(), webhook); + this.response = ""; + } + } + + protected boolean delivery(int attempt) { + startTime = new Date(); + try { + HttpPost request = getBasicHttpPostRequest(); + StringEntity input = new StringEntity(payload, + isValidJson(payload) ? ContentType.APPLICATION_JSON : ContentType.TEXT_PLAIN); + request.setEntity(input); + updateRequestHeaders(request); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Delivering event: {} for {} with timeout: {}, " + + "attempt #{}", event.getEventType(), webhook, + deliveryTimeout, attempt); + } + final CloseableHttpResponse response = httpClient.execute(request); + updateResponseFromRequest(response.getEntity()); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Successfully delivered event: {} for {}", + event.getEventType(), webhook); + } Review Comment: ```suggestion LOGGER.trace("Successfully delivered event: {} for {}", event.getEventType(), webhook); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudstack.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org