This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 292546402c IGNITE-21665 Implement Event log (#3522) 292546402c is described below commit 292546402cc6e3a86a5e3d83d8ad501e22f24978 Author: Aleksandr Pakhomov <apk...@gmail.com> AuthorDate: Mon Apr 1 21:03:49 2024 +0700 IGNITE-21665 Implement Event log (#3522) --- modules/eventlog/README.MD | 43 ++++++ .../eventlog/ItEventLogConfigurationTest.java | 44 ++++++ .../apache/ignite/internal/eventlog/api/Event.java | 10 +- .../api/{EventLog.java => EventChannel.java} | 19 ++- .../ignite/internal/eventlog/api/EventFactory.java | 10 +- .../ignite/internal/eventlog/api/EventLog.java | 26 +++- .../IgniteEventType.java} | 8 +- .../eventlog/{event => api}/IgniteEvents.java | 15 +- .../internal/eventlog/{sink => api}/Sink.java | 12 +- ...Schema.java => ChannelConfigurationSchema.java} | 26 ++-- .../config/schema/EventLogConfigurationSchema.java | 3 + .../config/schema/LogSinkConfigurationSchema.java | 4 +- .../config/schema/SinkConfigurationSchema.java | 9 +- .../ignite/internal/eventlog/event/EventImpl.java | 4 +- .../internal/eventlog/event/EventTypeRegistry.java | 7 +- .../event/exception/MissingEventTypeException.java | 2 +- .../EventLog.java => impl/ChannelFactory.java} | 21 +-- .../SinkFactory.java => impl/ChannelRegistry.java} | 33 ++--- .../impl/ConfigurationBasedChannelRegistry.java | 115 +++++++++++++++ .../impl/ConfigurationBasedSinkRegistry.java | 95 +++++++++++++ .../EventChannelImpl.java} | 31 +++-- .../internal/eventlog/impl/EventLogImpl.java | 57 ++++++++ .../internal/eventlog/{sink => impl}/LogSink.java | 5 +- .../eventlog/{sink => impl}/SinkFactory.java | 9 +- .../{api/EventLog.java => impl/SinkRegistry.java} | 15 +- .../internal/eventlog/event/IgniteEventsTest.java | 1 + .../ConfigurationBasedChannelRegistryTest.java | 155 +++++++++++++++++++++ .../impl/ConfigurationBasedSinkRegistryTest.java | 111 +++++++++++++++ .../internal/eventlog/impl/EventLogTest.java | 143 +++++++++++++++++++ .../eventlog/{sink => impl}/LogSinkTest.java | 5 +- .../eventlog/ser/JsonEventSerializerTest.java | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 8 +- .../authentication/AuthenticationManagerImpl.java | 2 +- 33 files changed, 942 insertions(+), 110 deletions(-) diff --git a/modules/eventlog/README.MD b/modules/eventlog/README.MD new file mode 100644 index 0000000000..ec84f7eaf8 --- /dev/null +++ b/modules/eventlog/README.MD @@ -0,0 +1,43 @@ +# Ignite Event Log + +Event log is a feature that allows to log events that happen in the system into some destination. + +There are several things that can be configured in this module: + +- Sink: The destination where the events will be logged. The sink can be a file, a database, a message queue, etc. +Now only logger sink is supported. +- Channels: Group different types of events that can be written into several sinks. +For example, there can be a channel for EVENT_TYPE_1 and EVENT_TYPE_2. All other events won't be logged into any sink that is +piped to this channel. +- Event Types: The type of the event. Each module can define and fire its own event types. + +## For users + +There is a finite number of event types that is defined in the system. You can not create new event types. + +To start logging events and then read them, you need to configure the event log. + +### Configuration + +``` +eventlog: + channels.authenticationChannel: { + enabled: true, + types: [USER_AUTHENTICATED] + } + sinks.authenticationLoggerSink: { + type: "log", + criteria: "authEventLog", + channel: "authenticationChannel" + } +``` + +This configuration defines a channel called `authenticationChannel` that will log events of type `USER_AUTHENTICATED` +into the sink `authenticationLoggerSink`. The sink has a type `log` that means that the events will be logged into +the logger that is defined for the system. The criteria is a name of the logger that will be used to log the events. +You can configure the logger with a name `authEventLog` in the logger configuration. + +## For developers + +To log an event, you need to use the `EventLog` interface. This is the only way to do it. If you want to define your +own event type, you need to register it in the `EventTypeRegistry` before the first creation of the event of this type. diff --git a/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/ItEventLogConfigurationTest.java b/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/ItEventLogConfigurationTest.java new file mode 100644 index 0000000000..d8782675d9 --- /dev/null +++ b/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/ItEventLogConfigurationTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; +import org.apache.ignite.internal.eventlog.config.schema.LogSinkChange; +import org.junit.jupiter.api.Test; + +class ItEventLogConfigurationTest extends ClusterPerClassIntegrationTest { + @Test + void correctConfiguration() { + assertDoesNotThrow(() -> CLUSTER.aliveNode().clusterConfiguration().change(c -> + c.changeRoot(EventLogConfiguration.KEY).changeSinks().create("logSink", s -> { + // Configure the channel. + s.changeChannel("testChannel"); + + // Configure the log sink. + var logSinkChange = (LogSinkChange) s.convert("log"); + logSinkChange.changeCriteria("EventLog"); + logSinkChange.changeLevel("info"); + logSinkChange.changeFormat("json"); + logSinkChange.changeChannel("testChannel"); + })).get() + ); + } +} diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java index ad282447cd..8e5f4f6f57 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java @@ -24,6 +24,11 @@ import org.apache.ignite.internal.eventlog.event.EventUser; /** Represents an event object that can be logged to the event log. */ public interface Event { + /** Default builder for the event object. */ + static EventBuilder builder() { + return new EventBuilder(); + } + /** The type of the event. The type must be registered in the {@link EventTypeRegistry}. */ String type(); @@ -38,9 +43,4 @@ public interface Event { /** The event-specific fields of the event. */ Map<String, Object> fields(); - - /** Default builder for the event object. */ - static EventBuilder builder() { - return new EventBuilder(); - } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventChannel.java similarity index 64% copy from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java copy to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventChannel.java index bc8c364a7f..71824409a3 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventChannel.java @@ -17,12 +17,21 @@ package org.apache.ignite.internal.eventlog.api; -import java.util.function.Supplier; +import java.util.Set; /** - * Logs events into specified sinks. - * TODO: https://issues.apache.org/jira/browse/IGNITE-21665. + * Event channel that groups events by type and sends these events into sinks that are piped into the channel. */ -public interface EventLog { - void log(Supplier<Event> eventProvider); +public interface EventChannel { + /** + * Returns the set of event types that this channel can handle. + */ + Set<String> types(); + + /** + * Logs the event into the channel. If the event type is not supported by the channel, the exception is thrown. + * + * @param event Event to log. + */ + void log(Event event); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java index 6357981ad8..d652ead416 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java @@ -19,11 +19,10 @@ package org.apache.ignite.internal.eventlog.api; import org.apache.ignite.internal.eventlog.event.EventBuilder; import org.apache.ignite.internal.eventlog.event.EventUser; -import org.apache.ignite.internal.eventlog.event.IgniteEventTypes; /** - * The factory that is responsible for creating events. This interface should be used everywhere where events are created. - * Only special cases should use {@link org.apache.ignite.internal.eventlog.event.EventBuilder} directly, for example, in tests. + * The factory that is responsible for creating events. This interface should be used everywhere where events are created. Only special + * cases should use {@link EventBuilder} directly, for example, in tests. */ public interface EventFactory { /** @@ -35,9 +34,8 @@ public interface EventFactory { Event create(EventUser user); /** - * Creates an event builder with type defined. The type is set by the factory. - * For example, {@link org.apache.ignite.internal.eventlog.event.IgniteEvents.CONNECTION_CLOSED.build} will return - * a builder with {@link IgniteEventTypes.CONNECTION_CLOSED} set. + * Creates an event builder with the event type defined. The type is set by the factory. For example, + * {@link IgniteEvents.CONNECTION_CLOSED.build} will return a builder with {@link IgniteEventType.CONNECTION_CLOSED} type set. */ EventBuilder builder(); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java index bc8c364a7f..922d3754e1 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java @@ -20,9 +20,31 @@ package org.apache.ignite.internal.eventlog.api; import java.util.function.Supplier; /** - * Logs events into specified sinks. - * TODO: https://issues.apache.org/jira/browse/IGNITE-21665. + * The main interface for logging events. + * + * <p>Example of usage. Let it be configured in the cluster configuration: + * <pre> + * eventlog.channels.exampleChannel: { + * types: [USER_AUTHENTICATED], + * enabled: true + * } + * eventlog.sinks.exampleSink: { + * channel: "exampleChannel", + * type: "log", + * criteria: "exampleLog" + * } + * </pre> + * + * <p>Here is how to fire an event that will be logged into the log file defined by "exampleLog": + * <pre> + * eventLog.log(() -> IgniteEvents.USER_AUTHENTICATED.create(EventUser.of("user1", "basicAuthenticationProvider")); + * </pre> */ public interface EventLog { + /** + * Writes event into every channel this event relates to. + * + * @param eventProvider Event provider. + */ void log(Supplier<Event> eventProvider); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEventTypes.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java similarity index 80% rename from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEventTypes.java rename to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java index 91ec1ebca3..84f57ecbc1 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEventTypes.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.event; +package org.apache.ignite.internal.eventlog.api; /** - * Defines a subset of event types that can be created in the system. Note, the event type is a string that is unique - * within the system. The event type is used to filter the events in the event log. + * Defines a subset of event types that can be created in the system. Note, the event type is a string that is unique within the system. The + * event type is used to filter the events in the event log. */ -public enum IgniteEventTypes { +public enum IgniteEventType { USER_AUTHENTICATED, CONNECTION_CLOSED } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEvents.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java similarity index 73% rename from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEvents.java rename to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java index 16edfdbb00..5ee7131f35 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEvents.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.event; +package org.apache.ignite.internal.eventlog.api; import java.util.Arrays; -import org.apache.ignite.internal.eventlog.api.Event; -import org.apache.ignite.internal.eventlog.api.EventFactory; +import org.apache.ignite.internal.eventlog.event.EventBuilder; +import org.apache.ignite.internal.eventlog.event.EventTypeRegistry; +import org.apache.ignite.internal.eventlog.event.EventUser; /** * The main class for creating all Ignite events. @@ -30,12 +31,14 @@ import org.apache.ignite.internal.eventlog.api.EventFactory; * <pre>{@code IgniteEvents.USER_AUTHENTICATED.create(EventUser.system());}</pre> */ public final class IgniteEvents implements EventFactory { - public static final IgniteEvents USER_AUTHENTICATED = new IgniteEvents(IgniteEventTypes.USER_AUTHENTICATED.name()); + public static final IgniteEvents USER_AUTHENTICATED = new IgniteEvents(IgniteEventType.USER_AUTHENTICATED.name()); - public static final IgniteEvents CONNECTION_CLOSED = new IgniteEvents(IgniteEventTypes.CONNECTION_CLOSED.name()); + public static final IgniteEvents CONNECTION_CLOSED = new IgniteEvents(IgniteEventType.CONNECTION_CLOSED.name()); static { - Arrays.stream(IgniteEventTypes.values()).forEach(type -> EventTypeRegistry.register(type.name())); + // Without the following line, the IgniteEventType enum will not be registered in the EventTypeRegistry + // and the EventTypeRegistry will not be able to validate the event types. + Arrays.stream(IgniteEventType.values()).forEach(type -> EventTypeRegistry.register(type.name())); } private final String type; diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/Sink.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Sink.java similarity index 81% rename from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/Sink.java rename to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Sink.java index 32b7daaada..164771e9c2 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/Sink.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Sink.java @@ -15,19 +15,17 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.sink; - -import org.apache.ignite.internal.eventlog.api.Event; +package org.apache.ignite.internal.eventlog.api; /** - * The endpoint for the event log framework. This is the last step in the event log pipeline. - * It can be a log file, a webhook, or a Kafka topic, or whatever we develop. + * The endpoint for the event log framework. This is the last step in the event log pipeline. It can be a log file, a webhook, a Kafka + * topic, or whatever we develop. * * <p>The contract of the only method is the following: * * <p>IT DOES NOT GUARANTEE THAT THE EVENT IS WRITTEN TO THE FINAL DESTINATION. - * For example, if the sink as a log file, the method does not guarantee that the event is written to the file. - * Because the logging framework can be asynchronous. + * For example, if the sink as a log file, the method does not guarantee that the event is written to the file. Because the logging + * framework can be asynchronous. * * <p>IT DOES GUARANTEE THAT THE EVENT IS SENT TO THE SINK. * For example, if the sink is a Kafka topic, the method guarantees that the event is sent to the topic. diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/ChannelConfigurationSchema.java similarity index 61% copy from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java copy to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/ChannelConfigurationSchema.java index d46348e5b2..9d123bd04a 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/ChannelConfigurationSchema.java @@ -17,19 +17,23 @@ package org.apache.ignite.internal.eventlog.config.schema; +import org.apache.ignite.configuration.annotation.Config; import org.apache.ignite.configuration.annotation.InjectedName; -import org.apache.ignite.configuration.annotation.PolymorphicConfig; -import org.apache.ignite.configuration.annotation.PolymorphicId; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.internal.eventlog.api.IgniteEventType; - -/** Configuration schema for sink. */ -@PolymorphicConfig -public class SinkConfigurationSchema { - /** The id of the sink that is used to identify the type: log, webhook, kafka. */ - @PolymorphicId(hasDefault = true) - public String id = LogSinkConfigurationSchema.POLYMORPHIC_ID; - - /** The name of the sink. */ +/** Channel configuration schema. */ +@Config +public class ChannelConfigurationSchema { + /** Channel name. This name is used to reference from sink configuration. */ @InjectedName public String name; + + /** Channel enabled flag. */ + @Value(hasDefault = true) + public boolean enabled = true; + + /** Event types that this channel should handle. Should be the types from {@link IgniteEventType}. */ + @Value(hasDefault = true) + public String[] events = {}; } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java index 1d575a2834..79222fc415 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java @@ -27,4 +27,7 @@ public class EventLogConfigurationSchema { /** The configuration schema for sinks. */ @NamedConfigValue public SinkConfigurationSchema sinks; + + @NamedConfigValue + public ChannelConfigurationSchema channels; } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java index f9d267f06a..b507485bfa 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java @@ -27,8 +27,8 @@ public class LogSinkConfigurationSchema extends SinkConfigurationSchema { public static final String POLYMORPHIC_ID = "log"; /** - * The criteria for the logger. In other words, the name of the logger. - * This name should be used to configure the logger in the logging framework. + * The criteria for the logger. In other words, the name of the logger. This name should be used to configure the logger in the logging + * framework. */ @Value(hasDefault = true) public String criteria = "EventLog"; diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java index d46348e5b2..7fa98fdafe 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java @@ -20,16 +20,21 @@ package org.apache.ignite.internal.eventlog.config.schema; import org.apache.ignite.configuration.annotation.InjectedName; import org.apache.ignite.configuration.annotation.PolymorphicConfig; import org.apache.ignite.configuration.annotation.PolymorphicId; +import org.apache.ignite.configuration.annotation.Value; /** Configuration schema for sink. */ @PolymorphicConfig public class SinkConfigurationSchema { - /** The id of the sink that is used to identify the type: log, webhook, kafka. */ + /** The type of the sink that is used to identify the type: log, webhook, kafka. */ @PolymorphicId(hasDefault = true) - public String id = LogSinkConfigurationSchema.POLYMORPHIC_ID; + public String type = LogSinkConfigurationSchema.POLYMORPHIC_ID; /** The name of the sink. */ @InjectedName public String name; + + /** The channel to which the sink is connected. Should be one of existing channels. */ + @Value(hasDefault = true) + public String channel = ""; } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java index 2389c91f2e..c5691f2aeb 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java @@ -23,8 +23,8 @@ import org.apache.ignite.internal.eventlog.api.Event; import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer; /** - * Implementation of the {@link Event} interface. The class is immutable and thread-safe. - * If you want to create an instance of this class, use the {@link EventBuilder}. + * Implementation of the {@link Event} interface. The class is immutable and thread-safe. If you want to create an instance of this class, + * use the {@link EventBuilder}. * * <p>NOTE: If you rename/add any field in this class, you should also update the {@link JsonEventSerializer}. */ diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java index 61a6a095ae..f5d37c8676 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.eventlog.event; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.internal.eventlog.api.IgniteEvents; import org.apache.ignite.internal.eventlog.event.exception.NotUniqueEventTypeException; /** @@ -31,6 +32,9 @@ public final class EventTypeRegistry { private static final ConcurrentHashMap<String, Object> allTypes = new ConcurrentHashMap<>(); + private EventTypeRegistry() { + } + /** Registers a set of event types. */ public static void register(Set<String> types) { new HashSet<>(types).forEach(EventTypeRegistry::register); @@ -50,7 +54,4 @@ public final class EventTypeRegistry { public static boolean contains(String type) { return allTypes.containsKey(type); } - - private EventTypeRegistry() { - } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java index ea5636e218..b30716cd34 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.lang.IgniteInternalException; public class MissingEventTypeException extends IgniteInternalException { private static final long serialVersionUID = -111097551088227263L; - /** Constructor.*/ + /** Constructor. */ public MissingEventTypeException() { super(ILLEGAL_ARGUMENT_ERR, "Missing event type during event creation."); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelFactory.java similarity index 64% copy from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java copy to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelFactory.java index bc8c364a7f..f43db37e39 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelFactory.java @@ -15,14 +15,19 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.api; +package org.apache.ignite.internal.eventlog.impl; -import java.util.function.Supplier; +import java.util.Set; +import org.apache.ignite.internal.eventlog.api.EventChannel; -/** - * Logs events into specified sinks. - * TODO: https://issues.apache.org/jira/browse/IGNITE-21665. - */ -public interface EventLog { - void log(Supplier<Event> eventProvider); +class ChannelFactory { + private final SinkRegistry sinkRegistry; + + ChannelFactory(SinkRegistry sinkRegistry) { + this.sinkRegistry = sinkRegistry; + } + + EventChannel createChannel(String name, Set<String> types) { + return new EventChannelImpl(types, sinkRegistry.findAllByChannel(name)); + } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java similarity index 52% copy from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java copy to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java index e3c4847101..74b7415dfe 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java @@ -15,28 +15,29 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.sink; +package org.apache.ignite.internal.eventlog.impl; -import org.apache.ignite.internal.eventlog.config.schema.LogSinkView; -import org.apache.ignite.internal.eventlog.config.schema.SinkView; -import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.lang.ErrorGroups.Common; +import java.util.Set; +import org.apache.ignite.internal.eventlog.api.EventChannel; /** - * Factory for creating sink instances. + * Channel registry. The only way to send an event into channel is to get the channel from this registry. + * The channel can not be cached for a long time because it can be removed from the registry due to configuration changes. */ -public class SinkFactory { +interface ChannelRegistry { /** - * Creates a sink instance. + * Get channel by name. * - * @param sinkView Sink configuration view. - * @return Sink instance. + * @param name Channel name. + * @return Channel instance. */ - public Sink createSink(SinkView sinkView) { - if (sinkView instanceof LogSinkView) { - return new LogSink((LogSinkView) sinkView); - } + EventChannel getByName(String name); - throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported sink type: " + sinkView.id()); - } + /** + * Get all channels that can handle the given event type. + * + * @param igniteEventType Ignite event type. + * @return Set of channels. + */ + Set<EventChannel> findAllChannelsByEventType(String igniteEventType); } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java new file mode 100644 index 0000000000..bd2fd3be90 --- /dev/null +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog.impl; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import org.apache.ignite.configuration.NamedListView; +import org.apache.ignite.configuration.notifications.ConfigurationListener; +import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; +import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.apache.ignite.internal.eventlog.config.schema.ChannelView; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; + +class ConfigurationBasedChannelRegistry implements ChannelRegistry { + private final ReadWriteLock guard; + + private final Map<String, EventChannel> cache; + + private final Map<String, Set<EventChannel>> typeCache; + + private final SinkRegistry sinkRegistry; + + ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry sinkRegistry) { + this.guard = new ReentrantReadWriteLock(); + this.cache = new HashMap<>(); + this.typeCache = new HashMap<>(); + this.sinkRegistry = sinkRegistry; + + cfg.channels().listen(new CacheUpdater()); + } + + @Override + public EventChannel getByName(String name) { + guard.readLock().lock(); + try { + return cache.get(name); + } finally { + guard.readLock().unlock(); + } + } + + @Override + public Set<EventChannel> findAllChannelsByEventType(String igniteEventType) { + guard.readLock().lock(); + try { + Set<EventChannel> result = typeCache.get(igniteEventType); + return result == null ? Set.of() : new HashSet<>(result); + } finally { + guard.readLock().unlock(); + } + } + + private class CacheUpdater implements ConfigurationListener<NamedListView<ChannelView>> { + @Override + public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<NamedListView<ChannelView>> ctx) { + NamedListView<ChannelView> newListValue = ctx.newValue(); + + guard.writeLock().lock(); + + try { + cache.clear(); + typeCache.clear(); + + newListValue.forEach(view -> { + if (view.enabled()) { + EventChannel channel = createChannel(view); + cache.put(view.name(), channel); + for (String eventType : view.events()) { + typeCache.computeIfAbsent( + eventType.trim(), + t -> new HashSet<>() + ).add(channel); + } + } + }); + + return completedFuture(null); + } finally { + guard.writeLock().unlock(); + } + } + + private EventChannel createChannel(ChannelView view) { + Set<String> types = Arrays.stream(view.events()) + .map(String::trim) + .collect(Collectors.toSet()); + + return new EventChannelImpl(types, sinkRegistry.findAllByChannel(view.name())); + } + } +} diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java new file mode 100644 index 0000000000..937a9fcab9 --- /dev/null +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog.impl; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.configuration.NamedListView; +import org.apache.ignite.configuration.notifications.ConfigurationListener; +import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; +import org.apache.ignite.internal.eventlog.api.Sink; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; +import org.apache.ignite.internal.eventlog.config.schema.SinkView; + +class ConfigurationBasedSinkRegistry implements SinkRegistry { + private final ReadWriteLock guard; + + private final Map<String, Sink> cache; + + private final Map<String, Set<Sink>> cacheByChannel; + + private final SinkFactory sinkFactory; + + ConfigurationBasedSinkRegistry(EventLogConfiguration cfg) { + this.guard = new ReentrantReadWriteLock(); + this.cache = new HashMap<>(); + this.cacheByChannel = new HashMap<>(); + this.sinkFactory = new SinkFactory(); + + cfg.sinks().listen(new CacheUpdater()); + } + + @Override + public Sink getByName(String name) { + guard.readLock().lock(); + try { + return cache.get(name); + } finally { + guard.readLock().unlock(); + } + } + + @Override + public Set<Sink> findAllByChannel(String channel) { + guard.readLock().lock(); + try { + Set<Sink> sinks = cacheByChannel.get(channel); + return sinks == null ? Set.of() : new HashSet<>(sinks); + } finally { + guard.readLock().unlock(); + } + } + + private class CacheUpdater implements ConfigurationListener<NamedListView<SinkView>> { + @Override + public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<NamedListView<SinkView>> ctx) { + NamedListView<SinkView> newListValue = ctx.newValue(); + + guard.writeLock().lock(); + try { + cache.clear(); + cacheByChannel.clear(); + for (SinkView sinkView : newListValue) { + Sink sink = sinkFactory.createSink(sinkView); + cache.put(sinkView.name(), sink); + cacheByChannel.computeIfAbsent(sinkView.channel(), k -> new HashSet<>()).add(sink); + } + return completedFuture(null); + } finally { + guard.writeLock().unlock(); + } + } + } +} diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java similarity index 53% copy from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java copy to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java index ea5636e218..94ef41acb7 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java @@ -15,18 +15,31 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.event.exception; +package org.apache.ignite.internal.eventlog.impl; -import static org.apache.ignite.lang.ErrorGroups.Common.ILLEGAL_ARGUMENT_ERR; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.apache.ignite.internal.eventlog.api.Sink; -import org.apache.ignite.internal.lang.IgniteInternalException; +class EventChannelImpl implements EventChannel { + private final Set<Sink> sinks; -/** Thrown when the event type is missing during event creation. */ -public class MissingEventTypeException extends IgniteInternalException { - private static final long serialVersionUID = -111097551088227263L; + private final Set<String> types; - /** Constructor.*/ - public MissingEventTypeException() { - super(ILLEGAL_ARGUMENT_ERR, "Missing event type during event creation."); + EventChannelImpl(Set<String> types, Set<Sink> sinks) { + this.types = new HashSet<>(types); + this.sinks = new HashSet<>(sinks); + } + + @Override + public Set<String> types() { + return types; + } + + @Override + public void log(Event event) { + sinks.forEach(s -> s.write(event)); } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java new file mode 100644 index 0000000000..dc3c70325f --- /dev/null +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog.impl; + +import java.util.Set; +import java.util.function.Supplier; +import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.apache.ignite.internal.eventlog.api.EventLog; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; + +/** + * Implementation of the {@link EventLog} interface. + */ +public class EventLogImpl implements EventLog { + private final ChannelRegistry channelRegistry; + + /** + * Creates an instance of EventLogImpl. + * + * @param channelRegistry the channel registry. + */ + public EventLogImpl(ChannelRegistry channelRegistry) { + this.channelRegistry = channelRegistry; + } + + /** + * Creates an instance of EventLogImpl that is configured via cluster configuration. + * + * @param cfg the configuration. + */ + public EventLogImpl(EventLogConfiguration cfg) { + this(new ConfigurationBasedChannelRegistry(cfg, new ConfigurationBasedSinkRegistry(cfg))); + } + + @Override + public void log(Supplier<Event> eventProvider) { + Event event = eventProvider.get(); + Set<EventChannel> channel = channelRegistry.findAllChannelsByEventType(event.type()); + channel.forEach(c -> c.log(event)); + } +} diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/LogSink.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java similarity index 92% rename from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/LogSink.java rename to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java index 2f9e6e9a25..c2aa43ff0b 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/LogSink.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.sink; +package org.apache.ignite.internal.eventlog.impl; import java.lang.System.Logger; import java.lang.System.Logger.Level; import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.Sink; import org.apache.ignite.internal.eventlog.config.schema.LogSinkView; import org.apache.ignite.internal.eventlog.ser.EventSerializer; import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer; /** Sink that writes events to the log using any logging framework the user has configured. */ -public class LogSink implements Sink { +class LogSink implements Sink { private final Logger logger; private final EventSerializer serializer; private final String level; diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java similarity index 87% rename from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java rename to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java index e3c4847101..23e1868bfa 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.sink; +package org.apache.ignite.internal.eventlog.impl; +import org.apache.ignite.internal.eventlog.api.Sink; import org.apache.ignite.internal.eventlog.config.schema.LogSinkView; import org.apache.ignite.internal.eventlog.config.schema.SinkView; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -25,18 +26,18 @@ import org.apache.ignite.lang.ErrorGroups.Common; /** * Factory for creating sink instances. */ -public class SinkFactory { +class SinkFactory { /** * Creates a sink instance. * * @param sinkView Sink configuration view. * @return Sink instance. */ - public Sink createSink(SinkView sinkView) { + Sink createSink(SinkView sinkView) { if (sinkView instanceof LogSinkView) { return new LogSink((LogSinkView) sinkView); } - throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported sink type: " + sinkView.id()); + throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported sink type: " + sinkView.type()); } } diff --git a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java similarity index 75% copy from modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java copy to modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java index bc8c364a7f..5151ae7b91 100644 --- a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java +++ b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.api; +package org.apache.ignite.internal.eventlog.impl; -import java.util.function.Supplier; +import java.util.Set; +import org.apache.ignite.internal.eventlog.api.Sink; -/** - * Logs events into specified sinks. - * TODO: https://issues.apache.org/jira/browse/IGNITE-21665. - */ -public interface EventLog { - void log(Supplier<Event> eventProvider); +interface SinkRegistry { + Sink getByName(String name); + + Set<Sink> findAllByChannel(String channel); } diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java index f0c6b1cca2..c1749521f8 100644 --- a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import java.util.stream.Stream; import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.IgniteEvents; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java new file mode 100644 index 0000000000..5aeebe1c1f --- /dev/null +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog.impl; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.apache.ignite.internal.eventlog.api.IgniteEventType; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ConfigurationExtension.class) +class ConfigurationBasedChannelRegistryTest extends BaseIgniteAbstractTest { + private static final String TEST_CHANNEL = "testChannel"; + + @InjectConfiguration + private EventLogConfiguration cfg; + + private ConfigurationBasedChannelRegistry registry; + + @BeforeEach + void setUp() { + registry = new ConfigurationBasedChannelRegistry(cfg, new ConfigurationBasedSinkRegistry(cfg)); + } + + @Test + void noSuchChannel() { + assertNull(registry.getByName("noSuchChannel")); + } + + @Test + void addNewConfigurationEntry() throws Exception { + // Given configuration with a channel. + cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name()); + })).get(); + + // When get channel from registry. + EventChannel channel = registry.getByName(TEST_CHANNEL); + + // Then it is configured correctly. + assertThat(channel.types(), hasItem(IgniteEventType.USER_AUTHENTICATED.name())); + } + + @Test + void removeConfigurationEntry() throws Exception { + // Given configuration with a channel. + cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name()); + })).get(); + + // When remove configuration entry. + cfg.change(c -> c.changeChannels().delete(TEST_CHANNEL)).get(); + + // Then channel is removed from registry. + assertThat(registry.getByName(TEST_CHANNEL), nullValue()); + } + + @Test + void updateConfigurationEntry() throws Exception { + // Given configuration with a channel. + cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name()); + })).get(); + + assertThat(registry.getByName(TEST_CHANNEL).types(), hasSize(1)); + + // When update configuration entry. + cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name(), IgniteEventType.CONNECTION_CLOSED.name()); + })).get(); + + // Then channel is updated in registry and types are not the same as the were before the update. + assertThat(registry.getByName(TEST_CHANNEL).types(), hasSize(2)); + } + + @Test + void findAllChannelsByEventType() throws Exception { + // Given configuration with a channel. + cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name()); + })).get(); + + // Then registry returns the channel by type. + assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATED.name()), hasSize(1)); + // But for another type it returns empty set. + assertThat(registry.findAllChannelsByEventType(IgniteEventType.CONNECTION_CLOSED.name()), hasSize(0)); + + // When update configuration entry. + cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name(), IgniteEventType.CONNECTION_CLOSED.name()); + })).get(); + + // Then registry returns the channel by type. + assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATED.name()), hasSize(1)); + assertThat(registry.findAllChannelsByEventType(IgniteEventType.CONNECTION_CLOSED.name()), hasSize(1)); + + // When add new channel. + cfg.channels().change(c -> c.create("newChannel", s -> { + s.changeEnabled(true); + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name()); + })).get(); + + // Then. + assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATED.name()), hasSize(2)); + assertThat(registry.findAllChannelsByEventType(IgniteEventType.CONNECTION_CLOSED.name()), hasSize(1)); + } + + @Test + void enableDisable() throws Exception { + // Given configuration with a channel. + cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> { + s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name()); + })).get(); + + assertThat(registry.getByName(TEST_CHANNEL), not(nullValue())); + + // When disable channel. + cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> s.changeEnabled(false))).get(); + + // Then channel is removed from registry. + assertThat(registry.getByName(TEST_CHANNEL), nullValue()); + } +} diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java new file mode 100644 index 0000000000..14bc42d5de --- /dev/null +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog.impl; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ConfigurationExtension.class) +class ConfigurationBasedSinkRegistryTest extends BaseIgniteAbstractTest { + private static final String TEST_CHANNEL = "testChannel"; + private static final String TEST_SINK = "testSink"; + + @InjectConfiguration + private EventLogConfiguration cfg; + + private ConfigurationBasedSinkRegistry registry; + + @BeforeEach + void setUp() { + registry = new ConfigurationBasedSinkRegistry(cfg); + } + + @Test + void noSuchSink() { + assertNull(registry.getByName("noSuchSink")); + } + + @Test + void addNewConfigurationEntry() throws Exception { + // Given configuration with a sink. + cfg.sinks().change(c -> c.create(TEST_SINK, s -> { + s.changeChannel(TEST_CHANNEL); + })).get(); + + // Then configuration is updated. + assertThat(registry.getByName(TEST_SINK), not(nullValue())); + } + + @Test + void removeConfigurationEntry() throws Exception { + // Given configuration with a sink. + cfg.sinks().change(c -> c.create(TEST_SINK, s -> { + s.changeChannel(TEST_CHANNEL); + })).get(); + + assertThat(registry.getByName(TEST_SINK), not(nullValue())); + + // When configuration is removed. + cfg.sinks().change(c -> c.delete(TEST_SINK)).get(); + + // Then sink is removed from registry . + assertThat(registry.getByName(TEST_SINK), nullValue()); + } + + @Test + void updateConfigurationEntry() throws Exception { + // Given configuration with a sink. + cfg.sinks().change(c -> c.create(TEST_SINK, s -> { + s.changeChannel("some"); + })).get(); + + assertThat(registry.getByName(TEST_SINK), not(nullValue())); + + // And sink can be found by channel. + assertThat(registry.findAllByChannel("some"), hasSize(1)); + + // When the channel is updated. + cfg.sinks().change(c -> c.update(TEST_SINK, s -> { + s.changeChannel(TEST_CHANNEL); + })).get(); + + // Then then the sink can not be found by previous channel. + assertThat(registry.findAllByChannel("some"), hasSize(0)); + // And the sink can be found by new channel. + assertThat(registry.findAllByChannel(TEST_CHANNEL), hasSize(1)); + + // When add one more sink with the same channel. + cfg.sinks().change(c -> c.create("newSink", s -> { + s.changeChannel(TEST_CHANNEL); + })).get(); + + // Then the sink can be found by new channel. + assertThat(registry.findAllByChannel(TEST_CHANNEL), hasSize(2)); + } +} diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java new file mode 100644 index 0000000000..54f9f0d217 --- /dev/null +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.eventlog.impl; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.EventChannel; +import org.apache.ignite.internal.eventlog.api.EventLog; +import org.apache.ignite.internal.eventlog.api.IgniteEvents; +import org.apache.ignite.internal.eventlog.api.Sink; +import org.apache.ignite.internal.eventlog.event.EventUser; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class EventLogTest { + private static final EventUser TEST_USER = EventUser.of("testuser", "basicAuthenticator"); + private static final Event TEST_EVENT = IgniteEvents.USER_AUTHENTICATED.create(TEST_USER); + private static final String TEST_CHANNEL_NAME = "testChannel"; + + private EventLog eventLog; + + private TestChannelRegistry channelRegistry; + private TestSinkRegistry sinkRegistry; + private ChannelFactory channelFactory; + + @BeforeEach + void setUp() { + channelRegistry = new TestChannelRegistry(); + sinkRegistry = new TestSinkRegistry(); + channelFactory = new ChannelFactory(sinkRegistry); + eventLog = new EventLogImpl(channelRegistry); + } + + @Test + void logsEventCorrectly() { + // Given no channels and sinks. + + // Then nothing thrown. + assertDoesNotThrow(() -> eventLog.log(() -> TEST_EVENT)); + + // When add a channel but there is no sink. + channelRegistry.register(TEST_CHANNEL_NAME, () -> channelFactory.createChannel( + TEST_CHANNEL_NAME, Set.of(TEST_EVENT.type())) + ); + + // Then nothing thrown. + assertDoesNotThrow(() -> eventLog.log(() -> TEST_EVENT)); + + // When add a sink for the channel. + List<Event> container = new ArrayList<>(); + sinkRegistry.register(TEST_CHANNEL_NAME, container::add); + + // And log event. + eventLog.log(() -> TEST_EVENT); + + // Then event is logged. + assertThat(container, hasItem(TEST_EVENT)); + + // When log event with a type that is not supported by the channel. + Event event = IgniteEvents.CONNECTION_CLOSED.create(TEST_USER); + + // Then nothing thrown. + assertDoesNotThrow(() -> eventLog.log(() -> event)); + // And the event is not logged. + assertThat(container, not(hasItem(event))); + } + + private static class TestChannelRegistry implements ChannelRegistry { + private final Map<String, Supplier<EventChannel>> channels; + + private TestChannelRegistry() { + channels = new HashMap<>(); + } + + void register(String name, Supplier<EventChannel> channel) { + channels.put(name, channel); + } + + @Override + public EventChannel getByName(String name) { + return channels.get(name).get(); + } + + @Override + public Set<EventChannel> findAllChannelsByEventType(String igniteEventType) { + return channels.values().stream() + .map(Supplier::get) + .filter(channel -> channel.types().contains(igniteEventType)) + .collect(HashSet::new, Set::add, Set::addAll); + } + } + + private static class TestSinkRegistry implements SinkRegistry { + private final Map<String, Sink> sinks; + + private TestSinkRegistry() { + sinks = new HashMap<>(); + } + + void register(String name, Sink sink) { + sinks.put(name, sink); + } + + @Override + public Sink getByName(String name) { + return sinks.get(name); + } + + @Override + public Set<Sink> findAllByChannel(String channel) { + if (!sinks.containsKey(channel)) { + return Set.of(); + } + return Set.of(sinks.get(channel)); + } + } +} diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/sink/LogSinkTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java similarity index 96% rename from modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/sink/LogSinkTest.java rename to modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java index 7e667b653d..ea6aff7909 100644 --- a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/sink/LogSinkTest.java +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.eventlog.sink; +package org.apache.ignite.internal.eventlog.impl; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; @@ -28,10 +28,11 @@ import java.nio.file.Path; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.IgniteEvents; +import org.apache.ignite.internal.eventlog.api.Sink; import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; import org.apache.ignite.internal.eventlog.config.schema.LogSinkChange; import org.apache.ignite.internal.eventlog.event.EventUser; -import org.apache.ignite.internal.eventlog.event.IgniteEvents; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; diff --git a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java index d8096d82ff..9473db1c80 100644 --- a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java +++ b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java @@ -23,15 +23,15 @@ import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; import java.util.Map; import java.util.stream.Stream; import org.apache.ignite.internal.eventlog.api.Event; +import org.apache.ignite.internal.eventlog.api.IgniteEvents; import org.apache.ignite.internal.eventlog.event.EventUser; -import org.apache.ignite.internal.eventlog.event.IgniteEvents; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; class JsonEventSerializerTest { - public static Stream<Arguments> events() { + private static Stream<Arguments> events() { return Stream.of( Arguments.of( IgniteEvents.CONNECTION_CLOSED.builder() diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index b4ca5bf0b2..cbb8a66b03 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -110,6 +110,8 @@ import org.apache.ignite.internal.deployunit.IgniteDeployment; import org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration; import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; +import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; +import org.apache.ignite.internal.eventlog.impl.EventLogImpl; import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.ClockServiceImpl; @@ -907,8 +909,10 @@ public class IgniteImpl implements Ignite { private AuthenticationManager createAuthenticationManager() { SecurityConfiguration securityConfiguration = clusterCfgMgr.configurationRegistry() .getConfiguration(SecurityConfiguration.KEY); - // TODO: https://issues.apache.org/jira/browse/IGNITE-21665 - return new AuthenticationManagerImpl(securityConfiguration, (ign) -> {}); + EventLogConfiguration eventLogConfiguration = clusterCfgMgr.configurationRegistry() + .getConfiguration(EventLogConfiguration.KEY); + + return new AuthenticationManagerImpl(securityConfiguration, new EventLogImpl(eventLogConfiguration)); } private RestComponent createRestComponent(String name) { diff --git a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java index c5faec9cfc..7997c1ea12 100644 --- a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java +++ b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java @@ -32,8 +32,8 @@ import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.configuration.notifications.ConfigurationListener; import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.eventlog.api.EventLog; +import org.apache.ignite.internal.eventlog.api.IgniteEvents; import org.apache.ignite.internal.eventlog.event.EventUser; -import org.apache.ignite.internal.eventlog.event.IgniteEvents; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.security.authentication.basic.BasicAuthenticationProviderConfiguration;