This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 292546402c IGNITE-21665 Implement Event log (#3522)
292546402c is described below

commit 292546402cc6e3a86a5e3d83d8ad501e22f24978
Author: Aleksandr Pakhomov <apk...@gmail.com>
AuthorDate: Mon Apr 1 21:03:49 2024 +0700

    IGNITE-21665 Implement Event log (#3522)
---
 modules/eventlog/README.MD                         |  43 ++++++
 .../eventlog/ItEventLogConfigurationTest.java      |  44 ++++++
 .../apache/ignite/internal/eventlog/api/Event.java |  10 +-
 .../api/{EventLog.java => EventChannel.java}       |  19 ++-
 .../ignite/internal/eventlog/api/EventFactory.java |  10 +-
 .../ignite/internal/eventlog/api/EventLog.java     |  26 +++-
 .../IgniteEventType.java}                          |   8 +-
 .../eventlog/{event => api}/IgniteEvents.java      |  15 +-
 .../internal/eventlog/{sink => api}/Sink.java      |  12 +-
 ...Schema.java => ChannelConfigurationSchema.java} |  26 ++--
 .../config/schema/EventLogConfigurationSchema.java |   3 +
 .../config/schema/LogSinkConfigurationSchema.java  |   4 +-
 .../config/schema/SinkConfigurationSchema.java     |   9 +-
 .../ignite/internal/eventlog/event/EventImpl.java  |   4 +-
 .../internal/eventlog/event/EventTypeRegistry.java |   7 +-
 .../event/exception/MissingEventTypeException.java |   2 +-
 .../EventLog.java => impl/ChannelFactory.java}     |  21 +--
 .../SinkFactory.java => impl/ChannelRegistry.java} |  33 ++---
 .../impl/ConfigurationBasedChannelRegistry.java    | 115 +++++++++++++++
 .../impl/ConfigurationBasedSinkRegistry.java       |  95 +++++++++++++
 .../EventChannelImpl.java}                         |  31 +++--
 .../internal/eventlog/impl/EventLogImpl.java       |  57 ++++++++
 .../internal/eventlog/{sink => impl}/LogSink.java  |   5 +-
 .../eventlog/{sink => impl}/SinkFactory.java       |   9 +-
 .../{api/EventLog.java => impl/SinkRegistry.java}  |  15 +-
 .../internal/eventlog/event/IgniteEventsTest.java  |   1 +
 .../ConfigurationBasedChannelRegistryTest.java     | 155 +++++++++++++++++++++
 .../impl/ConfigurationBasedSinkRegistryTest.java   | 111 +++++++++++++++
 .../internal/eventlog/impl/EventLogTest.java       | 143 +++++++++++++++++++
 .../eventlog/{sink => impl}/LogSinkTest.java       |   5 +-
 .../eventlog/ser/JsonEventSerializerTest.java      |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   8 +-
 .../authentication/AuthenticationManagerImpl.java  |   2 +-
 33 files changed, 942 insertions(+), 110 deletions(-)

diff --git a/modules/eventlog/README.MD b/modules/eventlog/README.MD
new file mode 100644
index 0000000000..ec84f7eaf8
--- /dev/null
+++ b/modules/eventlog/README.MD
@@ -0,0 +1,43 @@
+# Ignite Event Log
+
+Event log is a feature that allows to log events that happen in the system 
into some destination.
+
+There are several things that can be configured in this module:
+
+- Sink: The destination where the events will be logged. The sink can be a 
file, a database, a message queue, etc. 
+Now only logger sink is supported.
+- Channels: Group different types of events that can be written into several 
sinks. 
+For example, there can be a channel for EVENT_TYPE_1 and EVENT_TYPE_2. All 
other events won't be logged into any sink that is
+piped to this channel.
+- Event Types: The type of the event. Each module can define and fire its own 
event types.
+
+## For users
+
+There is a finite number of event types that is defined in the system. You can 
not create new event types. 
+
+To start logging events and then read them, you need to configure the event 
log.
+
+### Configuration
+
+``` 
+eventlog:
+  channels.authenticationChannel: {
+      enabled: true,
+      types: [USER_AUTHENTICATED] 
+    }
+  sinks.authenticationLoggerSink: {
+      type: "log",
+      criteria: "authEventLog",
+      channel: "authenticationChannel"
+    }
+```
+
+This configuration defines a channel called `authenticationChannel` that will 
log events of type `USER_AUTHENTICATED` 
+into the sink `authenticationLoggerSink`. The sink has a type `log` that means 
that the events will be logged into 
+the logger that is defined for the system. The criteria is a name of the 
logger that will be used to log the events.
+You can configure the logger with a name `authEventLog` in the logger 
configuration.
+
+## For developers
+
+To log an event, you need to use the `EventLog` interface. This is the only 
way to do it. If you want to define your
+own event type, you need to register it in the `EventTypeRegistry` before the 
first creation of the event of this type.
diff --git 
a/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/ItEventLogConfigurationTest.java
 
b/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/ItEventLogConfigurationTest.java
new file mode 100644
index 0000000000..d8782675d9
--- /dev/null
+++ 
b/modules/eventlog/src/integrationTest/java/org/apache/ignite/internal/eventlog/ItEventLogConfigurationTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.config.schema.LogSinkChange;
+import org.junit.jupiter.api.Test;
+
+class ItEventLogConfigurationTest extends ClusterPerClassIntegrationTest {
+    @Test
+    void correctConfiguration() {
+        assertDoesNotThrow(() -> 
CLUSTER.aliveNode().clusterConfiguration().change(c ->
+                
c.changeRoot(EventLogConfiguration.KEY).changeSinks().create("logSink", s -> {
+                    // Configure the channel.
+                    s.changeChannel("testChannel");
+
+                    // Configure the log sink.
+                    var logSinkChange = (LogSinkChange) s.convert("log");
+                    logSinkChange.changeCriteria("EventLog");
+                    logSinkChange.changeLevel("info");
+                    logSinkChange.changeFormat("json");
+                    logSinkChange.changeChannel("testChannel");
+                })).get()
+        );
+    }
+}
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
index ad282447cd..8e5f4f6f57 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
@@ -24,6 +24,11 @@ import org.apache.ignite.internal.eventlog.event.EventUser;
 
 /** Represents an event object that can be logged to the event log. */
 public interface Event {
+    /** Default builder for the event object. */
+    static EventBuilder builder() {
+        return new EventBuilder();
+    }
+
     /** The type of the event. The type must be registered in the {@link 
EventTypeRegistry}. */
     String type();
 
@@ -38,9 +43,4 @@ public interface Event {
 
     /** The event-specific fields of the event. */
     Map<String, Object> fields();
-
-    /** Default builder for the event object. */
-    static EventBuilder builder() {
-        return new EventBuilder();
-    }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventChannel.java
similarity index 64%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventChannel.java
index bc8c364a7f..71824409a3 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventChannel.java
@@ -17,12 +17,21 @@
 
 package org.apache.ignite.internal.eventlog.api;
 
-import java.util.function.Supplier;
+import java.util.Set;
 
 /**
- * Logs events into specified sinks.
- * TODO: https://issues.apache.org/jira/browse/IGNITE-21665.
+ * Event channel that groups events by type and sends these events into sinks 
that are piped into the channel.
  */
-public interface EventLog {
-    void log(Supplier<Event> eventProvider);
+public interface EventChannel {
+    /**
+     * Returns the set of event types that this channel can handle.
+     */
+    Set<String> types();
+
+    /**
+     * Logs the event into the channel. If the event type is not supported by 
the channel, the exception is thrown.
+     *
+     * @param event Event to log.
+     */
+    void log(Event event);
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java
index 6357981ad8..d652ead416 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventFactory.java
@@ -19,11 +19,10 @@ package org.apache.ignite.internal.eventlog.api;
 
 import org.apache.ignite.internal.eventlog.event.EventBuilder;
 import org.apache.ignite.internal.eventlog.event.EventUser;
-import org.apache.ignite.internal.eventlog.event.IgniteEventTypes;
 
 /**
- * The factory that is responsible for creating events. This interface should 
be used everywhere where events are created.
- * Only special cases should use {@link 
org.apache.ignite.internal.eventlog.event.EventBuilder} directly, for example, 
in tests.
+ * The factory that is responsible for creating events. This interface should 
be used everywhere where events are created. Only special
+ * cases should use {@link EventBuilder} directly, for example, in tests.
  */
 public interface EventFactory {
     /**
@@ -35,9 +34,8 @@ public interface EventFactory {
     Event create(EventUser user);
 
     /**
-     * Creates an event builder with type defined. The type is set by the 
factory.
-     * For example, {@link 
org.apache.ignite.internal.eventlog.event.IgniteEvents.CONNECTION_CLOSED.build} 
will return
-     * a builder with {@link IgniteEventTypes.CONNECTION_CLOSED} set.
+     * Creates an event builder with the event type defined. The type is set 
by the factory. For example,
+     * {@link IgniteEvents.CONNECTION_CLOSED.build} will return a builder with 
{@link IgniteEventType.CONNECTION_CLOSED} type set.
      */
     EventBuilder builder();
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
index bc8c364a7f..922d3754e1 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
@@ -20,9 +20,31 @@ package org.apache.ignite.internal.eventlog.api;
 import java.util.function.Supplier;
 
 /**
- * Logs events into specified sinks.
- * TODO: https://issues.apache.org/jira/browse/IGNITE-21665.
+ * The main interface for logging events.
+ *
+ * <p>Example of usage. Let it be configured in the cluster configuration:
+ * <pre>
+ *     eventlog.channels.exampleChannel: {
+ *       types: [USER_AUTHENTICATED],
+ *       enabled: true
+ *     }
+ *     eventlog.sinks.exampleSink: {
+ *       channel: "exampleChannel",
+ *       type: "log",
+ *       criteria: "exampleLog"
+ *     }
+ * </pre>
+ *
+ * <p>Here is how to fire an event that will be logged into the log file 
defined by "exampleLog":
+ * <pre>
+ *     eventLog.log(() -> 
IgniteEvents.USER_AUTHENTICATED.create(EventUser.of("user1", 
"basicAuthenticationProvider"));
+ * </pre>
  */
 public interface EventLog {
+    /**
+     * Writes event into every channel this event relates to.
+     *
+     * @param eventProvider Event provider.
+     */
     void log(Supplier<Event> eventProvider);
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEventTypes.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
similarity index 80%
rename from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEventTypes.java
rename to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
index 91ec1ebca3..84f57ecbc1 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEventTypes.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.event;
+package org.apache.ignite.internal.eventlog.api;
 
 /**
- * Defines a subset of event types that can be created in the system. Note, 
the event type is a string that is unique
- * within the system. The event type is used to filter the events in the event 
log.
+ * Defines a subset of event types that can be created in the system. Note, 
the event type is a string that is unique within the system. The
+ * event type is used to filter the events in the event log.
  */
-public enum IgniteEventTypes {
+public enum IgniteEventType {
     USER_AUTHENTICATED,
     CONNECTION_CLOSED
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEvents.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java
similarity index 73%
rename from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEvents.java
rename to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java
index 16edfdbb00..5ee7131f35 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/IgniteEvents.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEvents.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.event;
+package org.apache.ignite.internal.eventlog.api;
 
 import java.util.Arrays;
-import org.apache.ignite.internal.eventlog.api.Event;
-import org.apache.ignite.internal.eventlog.api.EventFactory;
+import org.apache.ignite.internal.eventlog.event.EventBuilder;
+import org.apache.ignite.internal.eventlog.event.EventTypeRegistry;
+import org.apache.ignite.internal.eventlog.event.EventUser;
 
 /**
  * The main class for creating all Ignite events.
@@ -30,12 +31,14 @@ import org.apache.ignite.internal.eventlog.api.EventFactory;
  * <pre>{@code 
IgniteEvents.USER_AUTHENTICATED.create(EventUser.system());}</pre>
  */
 public final class IgniteEvents implements EventFactory {
-    public static final IgniteEvents USER_AUTHENTICATED = new 
IgniteEvents(IgniteEventTypes.USER_AUTHENTICATED.name());
+    public static final IgniteEvents USER_AUTHENTICATED = new 
IgniteEvents(IgniteEventType.USER_AUTHENTICATED.name());
 
-    public static final IgniteEvents CONNECTION_CLOSED = new 
IgniteEvents(IgniteEventTypes.CONNECTION_CLOSED.name());
+    public static final IgniteEvents CONNECTION_CLOSED = new 
IgniteEvents(IgniteEventType.CONNECTION_CLOSED.name());
 
     static {
-        Arrays.stream(IgniteEventTypes.values()).forEach(type -> 
EventTypeRegistry.register(type.name()));
+        // Without the following line, the IgniteEventType enum will not be 
registered in the EventTypeRegistry
+        // and the EventTypeRegistry will not be able to validate the event 
types.
+        Arrays.stream(IgniteEventType.values()).forEach(type -> 
EventTypeRegistry.register(type.name()));
     }
 
     private final String type;
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/Sink.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Sink.java
similarity index 81%
rename from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/Sink.java
rename to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Sink.java
index 32b7daaada..164771e9c2 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/Sink.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Sink.java
@@ -15,19 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.sink;
-
-import org.apache.ignite.internal.eventlog.api.Event;
+package org.apache.ignite.internal.eventlog.api;
 
 /**
- * The endpoint for the event log framework. This is the last step in the 
event log pipeline.
- * It can be a log file, a webhook, or a Kafka topic, or whatever we develop.
+ * The endpoint for the event log framework. This is the last step in the 
event log pipeline. It can be a log file, a webhook, a Kafka
+ * topic, or whatever we develop.
  *
  * <p>The contract of the only method is the following:
  *
  * <p>IT DOES NOT GUARANTEE THAT THE EVENT IS WRITTEN TO THE FINAL DESTINATION.
- * For example, if the sink as a log file, the method does not guarantee that 
the event is written to the file.
- * Because the logging framework can be asynchronous.
+ * For example, if the sink as a log file, the method does not guarantee that 
the event is written to the file. Because the logging
+ * framework can be asynchronous.
  *
  * <p>IT DOES GUARANTEE THAT THE EVENT IS SENT TO THE SINK.
  * For example, if the sink is a Kafka topic, the method guarantees that the 
event is sent to the topic.
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/ChannelConfigurationSchema.java
similarity index 61%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/ChannelConfigurationSchema.java
index d46348e5b2..9d123bd04a 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/ChannelConfigurationSchema.java
@@ -17,19 +17,23 @@
 
 package org.apache.ignite.internal.eventlog.config.schema;
 
+import org.apache.ignite.configuration.annotation.Config;
 import org.apache.ignite.configuration.annotation.InjectedName;
-import org.apache.ignite.configuration.annotation.PolymorphicConfig;
-import org.apache.ignite.configuration.annotation.PolymorphicId;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
 
-
-/** Configuration schema for sink. */
-@PolymorphicConfig
-public class SinkConfigurationSchema {
-    /** The id of the sink that is used to identify the type: log, webhook, 
kafka. */
-    @PolymorphicId(hasDefault = true)
-    public String id = LogSinkConfigurationSchema.POLYMORPHIC_ID;
-
-    /** The name of the sink. */
+/** Channel configuration schema. */
+@Config
+public class ChannelConfigurationSchema {
+    /** Channel name. This name is used to reference from sink configuration. 
*/
     @InjectedName
     public String name;
+
+    /** Channel enabled flag. */
+    @Value(hasDefault = true)
+    public boolean enabled = true;
+
+    /** Event types that this channel should handle. Should be the types from 
{@link IgniteEventType}. */
+    @Value(hasDefault = true)
+    public String[] events = {};
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java
index 1d575a2834..79222fc415 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/EventLogConfigurationSchema.java
@@ -27,4 +27,7 @@ public class EventLogConfigurationSchema {
     /** The configuration schema for sinks. */
     @NamedConfigValue
     public SinkConfigurationSchema sinks;
+
+    @NamedConfigValue
+    public ChannelConfigurationSchema channels;
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java
index f9d267f06a..b507485bfa 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/LogSinkConfigurationSchema.java
@@ -27,8 +27,8 @@ public class LogSinkConfigurationSchema extends 
SinkConfigurationSchema {
     public static final String POLYMORPHIC_ID = "log";
 
     /**
-     * The criteria for the logger. In other words, the name of the logger.
-     * This name should be used to configure the logger in the logging 
framework.
+     * The criteria for the logger. In other words, the name of the logger. 
This name should be used to configure the logger in the logging
+     * framework.
      */
     @Value(hasDefault = true)
     public String criteria = "EventLog";
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
index d46348e5b2..7fa98fdafe 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/config/schema/SinkConfigurationSchema.java
@@ -20,16 +20,21 @@ package org.apache.ignite.internal.eventlog.config.schema;
 import org.apache.ignite.configuration.annotation.InjectedName;
 import org.apache.ignite.configuration.annotation.PolymorphicConfig;
 import org.apache.ignite.configuration.annotation.PolymorphicId;
+import org.apache.ignite.configuration.annotation.Value;
 
 
 /** Configuration schema for sink. */
 @PolymorphicConfig
 public class SinkConfigurationSchema {
-    /** The id of the sink that is used to identify the type: log, webhook, 
kafka. */
+    /** The type of the sink that is used to identify the type: log, webhook, 
kafka. */
     @PolymorphicId(hasDefault = true)
-    public String id = LogSinkConfigurationSchema.POLYMORPHIC_ID;
+    public String type = LogSinkConfigurationSchema.POLYMORPHIC_ID;
 
     /** The name of the sink. */
     @InjectedName
     public String name;
+
+    /** The channel to which the sink is connected. Should be one of existing 
channels. */
+    @Value(hasDefault = true)
+    public String channel = "";
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
index 2389c91f2e..c5691f2aeb 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
@@ -23,8 +23,8 @@ import org.apache.ignite.internal.eventlog.api.Event;
 import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer;
 
 /**
- * Implementation of the {@link Event} interface. The class is immutable and 
thread-safe.
- * If you want to create an instance of this class, use the {@link 
EventBuilder}.
+ * Implementation of the {@link Event} interface. The class is immutable and 
thread-safe. If you want to create an instance of this class,
+ * use the {@link EventBuilder}.
  *
  * <p>NOTE: If you rename/add any field in this class, you should also update 
the {@link JsonEventSerializer}.
  */
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java
index 61a6a095ae..f5d37c8676 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventTypeRegistry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.eventlog.event;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.eventlog.api.IgniteEvents;
 import 
org.apache.ignite.internal.eventlog.event.exception.NotUniqueEventTypeException;
 
 /**
@@ -31,6 +32,9 @@ public final class EventTypeRegistry {
 
     private static final ConcurrentHashMap<String, Object> allTypes = new 
ConcurrentHashMap<>();
 
+    private EventTypeRegistry() {
+    }
+
     /** Registers a set of event types. */
     public static void register(Set<String> types) {
         new HashSet<>(types).forEach(EventTypeRegistry::register);
@@ -50,7 +54,4 @@ public final class EventTypeRegistry {
     public static boolean contains(String type) {
         return allTypes.containsKey(type);
     }
-
-    private EventTypeRegistry() {
-    }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
index ea5636e218..b30716cd34 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
@@ -25,7 +25,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 public class MissingEventTypeException extends IgniteInternalException {
     private static final long serialVersionUID = -111097551088227263L;
 
-    /** Constructor.*/
+    /** Constructor. */
     public MissingEventTypeException() {
         super(ILLEGAL_ARGUMENT_ERR, "Missing event type during event 
creation.");
     }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelFactory.java
similarity index 64%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelFactory.java
index bc8c364a7f..f43db37e39 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelFactory.java
@@ -15,14 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.api;
+package org.apache.ignite.internal.eventlog.impl;
 
-import java.util.function.Supplier;
+import java.util.Set;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
 
-/**
- * Logs events into specified sinks.
- * TODO: https://issues.apache.org/jira/browse/IGNITE-21665.
- */
-public interface EventLog {
-    void log(Supplier<Event> eventProvider);
+class ChannelFactory {
+    private final SinkRegistry sinkRegistry;
+
+    ChannelFactory(SinkRegistry sinkRegistry) {
+        this.sinkRegistry = sinkRegistry;
+    }
+
+    EventChannel createChannel(String name, Set<String> types) {
+        return new EventChannelImpl(types, 
sinkRegistry.findAllByChannel(name));
+    }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java
similarity index 52%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java
index e3c4847101..74b7415dfe 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ChannelRegistry.java
@@ -15,28 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.sink;
+package org.apache.ignite.internal.eventlog.impl;
 
-import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
-import org.apache.ignite.internal.eventlog.config.schema.SinkView;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.lang.ErrorGroups.Common;
+import java.util.Set;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
 
 /**
- * Factory for creating sink instances.
+ * Channel registry. The only way to send an event into channel is to get the 
channel from this registry.
+ * The channel can not be cached for a long time because it can be removed 
from the registry due to configuration changes.
  */
-public class SinkFactory {
+interface ChannelRegistry {
     /**
-     * Creates a sink instance.
+     * Get channel by name.
      *
-     * @param sinkView Sink configuration view.
-     * @return Sink instance.
+     * @param name Channel name.
+     * @return Channel instance.
      */
-    public Sink createSink(SinkView sinkView) {
-        if (sinkView instanceof LogSinkView) {
-            return new LogSink((LogSinkView) sinkView);
-        }
+    EventChannel getByName(String name);
 
-        throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported 
sink type: " + sinkView.id());
-    }
+    /**
+     * Get all channels that can handle the given event type.
+     *
+     * @param igniteEventType Ignite event type.
+     * @return Set of channels.
+     */
+    Set<EventChannel> findAllChannelsByEventType(String igniteEventType);
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java
new file mode 100644
index 0000000000..bd2fd3be90
--- /dev/null
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistry.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
+import org.apache.ignite.internal.eventlog.config.schema.ChannelView;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+
+class ConfigurationBasedChannelRegistry implements ChannelRegistry {
+    private final ReadWriteLock guard;
+
+    private final Map<String, EventChannel> cache;
+
+    private final Map<String, Set<EventChannel>> typeCache;
+
+    private final SinkRegistry sinkRegistry;
+
+    ConfigurationBasedChannelRegistry(EventLogConfiguration cfg, SinkRegistry 
sinkRegistry) {
+        this.guard = new ReentrantReadWriteLock();
+        this.cache = new HashMap<>();
+        this.typeCache = new HashMap<>();
+        this.sinkRegistry = sinkRegistry;
+
+        cfg.channels().listen(new CacheUpdater());
+    }
+
+    @Override
+    public EventChannel getByName(String name) {
+        guard.readLock().lock();
+        try {
+            return cache.get(name);
+        } finally {
+            guard.readLock().unlock();
+        }
+    }
+
+    @Override
+    public Set<EventChannel> findAllChannelsByEventType(String 
igniteEventType) {
+        guard.readLock().lock();
+        try {
+            Set<EventChannel> result = typeCache.get(igniteEventType);
+            return result == null ? Set.of() : new HashSet<>(result);
+        } finally {
+            guard.readLock().unlock();
+        }
+    }
+
+    private class CacheUpdater implements 
ConfigurationListener<NamedListView<ChannelView>> {
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<NamedListView<ChannelView>> ctx) {
+            NamedListView<ChannelView> newListValue = ctx.newValue();
+
+            guard.writeLock().lock();
+
+            try {
+                cache.clear();
+                typeCache.clear();
+
+                newListValue.forEach(view -> {
+                    if (view.enabled()) {
+                        EventChannel channel = createChannel(view);
+                        cache.put(view.name(), channel);
+                        for (String eventType : view.events()) {
+                            typeCache.computeIfAbsent(
+                                    eventType.trim(),
+                                    t -> new HashSet<>()
+                            ).add(channel);
+                        }
+                    }
+                });
+
+                return completedFuture(null);
+            } finally {
+                guard.writeLock().unlock();
+            }
+        }
+
+        private EventChannel createChannel(ChannelView view) {
+            Set<String> types = Arrays.stream(view.events())
+                    .map(String::trim)
+                    .collect(Collectors.toSet());
+
+            return new EventChannelImpl(types, 
sinkRegistry.findAllByChannel(view.name()));
+        }
+    }
+}
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
new file mode 100644
index 0000000000..937a9fcab9
--- /dev/null
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistry.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.eventlog.api.Sink;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.config.schema.SinkView;
+
+class ConfigurationBasedSinkRegistry implements SinkRegistry {
+    private final ReadWriteLock guard;
+
+    private final Map<String, Sink> cache;
+
+    private final Map<String, Set<Sink>> cacheByChannel;
+
+    private final SinkFactory sinkFactory;
+
+    ConfigurationBasedSinkRegistry(EventLogConfiguration cfg) {
+        this.guard = new ReentrantReadWriteLock();
+        this.cache = new HashMap<>();
+        this.cacheByChannel = new HashMap<>();
+        this.sinkFactory = new SinkFactory();
+
+        cfg.sinks().listen(new CacheUpdater());
+    }
+
+    @Override
+    public Sink getByName(String name) {
+        guard.readLock().lock();
+        try {
+            return cache.get(name);
+        } finally {
+            guard.readLock().unlock();
+        }
+    }
+
+    @Override
+    public Set<Sink> findAllByChannel(String channel) {
+        guard.readLock().lock();
+        try {
+            Set<Sink> sinks = cacheByChannel.get(channel);
+            return sinks == null ? Set.of() : new HashSet<>(sinks);
+        } finally {
+            guard.readLock().unlock();
+        }
+    }
+
+    private class CacheUpdater implements 
ConfigurationListener<NamedListView<SinkView>> {
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<NamedListView<SinkView>> ctx) {
+            NamedListView<SinkView> newListValue = ctx.newValue();
+
+            guard.writeLock().lock();
+            try {
+                cache.clear();
+                cacheByChannel.clear();
+                for (SinkView sinkView : newListValue) {
+                    Sink sink = sinkFactory.createSink(sinkView);
+                    cache.put(sinkView.name(), sink);
+                    cacheByChannel.computeIfAbsent(sinkView.channel(), k -> 
new HashSet<>()).add(sink);
+                }
+                return completedFuture(null);
+            } finally {
+                guard.writeLock().unlock();
+            }
+        }
+    }
+}
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java
similarity index 53%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java
index ea5636e218..94ef41acb7 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/exception/MissingEventTypeException.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventChannelImpl.java
@@ -15,18 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.event.exception;
+package org.apache.ignite.internal.eventlog.impl;
 
-import static org.apache.ignite.lang.ErrorGroups.Common.ILLEGAL_ARGUMENT_ERR;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
+import org.apache.ignite.internal.eventlog.api.Sink;
 
-import org.apache.ignite.internal.lang.IgniteInternalException;
+class EventChannelImpl implements EventChannel {
+    private final Set<Sink> sinks;
 
-/** Thrown when the event type is missing during event creation. */
-public class MissingEventTypeException extends IgniteInternalException {
-    private static final long serialVersionUID = -111097551088227263L;
+    private final Set<String> types;
 
-    /** Constructor.*/
-    public MissingEventTypeException() {
-        super(ILLEGAL_ARGUMENT_ERR, "Missing event type during event 
creation.");
+    EventChannelImpl(Set<String> types, Set<Sink> sinks) {
+        this.types = new HashSet<>(types);
+        this.sinks = new HashSet<>(sinks);
+    }
+
+    @Override
+    public Set<String> types() {
+        return types;
+    }
+
+    @Override
+    public void log(Event event) {
+        sinks.forEach(s -> s.write(event));
     }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
new file mode 100644
index 0000000000..dc3c70325f
--- /dev/null
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.impl;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
+import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+
+/**
+ * Implementation of the {@link EventLog} interface.
+ */
+public class EventLogImpl implements EventLog {
+    private final ChannelRegistry channelRegistry;
+
+    /**
+     * Creates an instance of EventLogImpl.
+     *
+     * @param channelRegistry the channel registry.
+     */
+    public EventLogImpl(ChannelRegistry channelRegistry) {
+        this.channelRegistry = channelRegistry;
+    }
+
+    /**
+     * Creates an instance of EventLogImpl that is configured via cluster 
configuration.
+     *
+     * @param cfg the configuration.
+     */
+    public EventLogImpl(EventLogConfiguration cfg) {
+        this(new ConfigurationBasedChannelRegistry(cfg, new 
ConfigurationBasedSinkRegistry(cfg)));
+    }
+
+    @Override
+    public void log(Supplier<Event> eventProvider) {
+        Event event = eventProvider.get();
+        Set<EventChannel> channel = 
channelRegistry.findAllChannelsByEventType(event.type());
+        channel.forEach(c -> c.log(event));
+    }
+}
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/LogSink.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
similarity index 92%
rename from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/LogSink.java
rename to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
index 2f9e6e9a25..c2aa43ff0b 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/LogSink.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.sink;
+package org.apache.ignite.internal.eventlog.impl;
 
 import java.lang.System.Logger;
 import java.lang.System.Logger.Level;
 import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.Sink;
 import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
 import org.apache.ignite.internal.eventlog.ser.EventSerializer;
 import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer;
 
 /** Sink that writes events to the log using any logging framework the user 
has configured. */
-public class LogSink implements Sink {
+class LogSink implements Sink {
     private final Logger logger;
     private final EventSerializer serializer;
     private final String level;
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
similarity index 87%
rename from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java
rename to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
index e3c4847101..23e1868bfa 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/sink/SinkFactory.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.sink;
+package org.apache.ignite.internal.eventlog.impl;
 
+import org.apache.ignite.internal.eventlog.api.Sink;
 import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
 import org.apache.ignite.internal.eventlog.config.schema.SinkView;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -25,18 +26,18 @@ import org.apache.ignite.lang.ErrorGroups.Common;
 /**
  * Factory for creating sink instances.
  */
-public class SinkFactory {
+class SinkFactory {
     /**
      * Creates a sink instance.
      *
      * @param sinkView Sink configuration view.
      * @return Sink instance.
      */
-    public Sink createSink(SinkView sinkView) {
+    Sink createSink(SinkView sinkView) {
         if (sinkView instanceof LogSinkView) {
             return new LogSink((LogSinkView) sinkView);
         }
 
-        throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported 
sink type: " + sinkView.id());
+        throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported 
sink type: " + sinkView.type());
     }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java
similarity index 75%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java
index bc8c364a7f..5151ae7b91 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/EventLog.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkRegistry.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.api;
+package org.apache.ignite.internal.eventlog.impl;
 
-import java.util.function.Supplier;
+import java.util.Set;
+import org.apache.ignite.internal.eventlog.api.Sink;
 
-/**
- * Logs events into specified sinks.
- * TODO: https://issues.apache.org/jira/browse/IGNITE-21665.
- */
-public interface EventLog {
-    void log(Supplier<Event> eventProvider);
+interface SinkRegistry {
+    Sink getByName(String name);
+
+    Set<Sink> findAllByChannel(String channel);
 }
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
index f0c6b1cca2..c1749521f8 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 import java.util.stream.Stream;
 import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.IgniteEvents;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
new file mode 100644
index 0000000000..5aeebe1c1f
--- /dev/null
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.impl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+class ConfigurationBasedChannelRegistryTest extends BaseIgniteAbstractTest {
+    private static final String TEST_CHANNEL = "testChannel";
+
+    @InjectConfiguration
+    private EventLogConfiguration cfg;
+
+    private ConfigurationBasedChannelRegistry registry;
+
+    @BeforeEach
+    void setUp() {
+        registry = new ConfigurationBasedChannelRegistry(cfg, new 
ConfigurationBasedSinkRegistry(cfg));
+    }
+
+    @Test
+    void noSuchChannel() {
+        assertNull(registry.getByName("noSuchChannel"));
+    }
+
+    @Test
+    void addNewConfigurationEntry() throws Exception {
+        // Given configuration with a channel.
+        cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name());
+        })).get();
+
+        // When get channel from registry.
+        EventChannel channel = registry.getByName(TEST_CHANNEL);
+
+        // Then it is configured correctly.
+        assertThat(channel.types(), 
hasItem(IgniteEventType.USER_AUTHENTICATED.name()));
+    }
+
+    @Test
+    void removeConfigurationEntry() throws Exception {
+        // Given configuration with a channel.
+        cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name());
+        })).get();
+
+        // When remove configuration entry.
+        cfg.change(c -> c.changeChannels().delete(TEST_CHANNEL)).get();
+
+        // Then channel is removed from registry.
+        assertThat(registry.getByName(TEST_CHANNEL), nullValue());
+    }
+
+    @Test
+    void updateConfigurationEntry() throws Exception {
+        // Given configuration with a channel.
+        cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name());
+        })).get();
+
+        assertThat(registry.getByName(TEST_CHANNEL).types(), hasSize(1));
+
+        // When update configuration entry.
+        cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name(), 
IgniteEventType.CONNECTION_CLOSED.name());
+        })).get();
+
+        // Then channel is updated in registry and types are not the same as 
the were before the update.
+        assertThat(registry.getByName(TEST_CHANNEL).types(), hasSize(2));
+    }
+
+    @Test
+    void findAllChannelsByEventType() throws Exception {
+        // Given configuration with a channel.
+        cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name());
+        })).get();
+
+        // Then registry returns the channel by type.
+        
assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATED.name()),
 hasSize(1));
+        // But for another type it returns empty set.
+        
assertThat(registry.findAllChannelsByEventType(IgniteEventType.CONNECTION_CLOSED.name()),
 hasSize(0));
+
+        // When update configuration entry.
+        cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name(), 
IgniteEventType.CONNECTION_CLOSED.name());
+        })).get();
+
+        // Then registry returns the channel by type.
+        
assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATED.name()),
 hasSize(1));
+        
assertThat(registry.findAllChannelsByEventType(IgniteEventType.CONNECTION_CLOSED.name()),
 hasSize(1));
+
+        // When add new channel.
+        cfg.channels().change(c -> c.create("newChannel", s -> {
+            s.changeEnabled(true);
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name());
+        })).get();
+
+        // Then.
+        
assertThat(registry.findAllChannelsByEventType(IgniteEventType.USER_AUTHENTICATED.name()),
 hasSize(2));
+        
assertThat(registry.findAllChannelsByEventType(IgniteEventType.CONNECTION_CLOSED.name()),
 hasSize(1));
+    }
+
+    @Test
+    void enableDisable() throws Exception {
+        // Given configuration with a channel.
+        cfg.channels().change(c -> c.create(TEST_CHANNEL, s -> {
+            s.changeEvents(IgniteEventType.USER_AUTHENTICATED.name());
+        })).get();
+
+        assertThat(registry.getByName(TEST_CHANNEL), not(nullValue()));
+
+        // When disable channel.
+        cfg.channels().change(c -> c.update(TEST_CHANNEL, s -> 
s.changeEnabled(false))).get();
+
+        // Then channel is removed from registry.
+        assertThat(registry.getByName(TEST_CHANNEL), nullValue());
+    }
+}
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
new file mode 100644
index 0000000000..14bc42d5de
--- /dev/null
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.impl;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+class ConfigurationBasedSinkRegistryTest extends BaseIgniteAbstractTest {
+    private static final String TEST_CHANNEL = "testChannel";
+    private static final String TEST_SINK = "testSink";
+
+    @InjectConfiguration
+    private EventLogConfiguration cfg;
+
+    private ConfigurationBasedSinkRegistry registry;
+
+    @BeforeEach
+    void setUp() {
+        registry = new ConfigurationBasedSinkRegistry(cfg);
+    }
+
+    @Test
+    void noSuchSink() {
+        assertNull(registry.getByName("noSuchSink"));
+    }
+
+    @Test
+    void addNewConfigurationEntry() throws Exception {
+        // Given configuration with a sink.
+        cfg.sinks().change(c -> c.create(TEST_SINK, s -> {
+            s.changeChannel(TEST_CHANNEL);
+        })).get();
+
+        // Then configuration is updated.
+        assertThat(registry.getByName(TEST_SINK), not(nullValue()));
+    }
+
+    @Test
+    void removeConfigurationEntry() throws Exception {
+        // Given configuration with a sink.
+        cfg.sinks().change(c -> c.create(TEST_SINK, s -> {
+            s.changeChannel(TEST_CHANNEL);
+        })).get();
+
+        assertThat(registry.getByName(TEST_SINK), not(nullValue()));
+
+        // When configuration is removed.
+        cfg.sinks().change(c -> c.delete(TEST_SINK)).get();
+
+        // Then sink is removed from registry .
+        assertThat(registry.getByName(TEST_SINK), nullValue());
+    }
+
+    @Test
+    void updateConfigurationEntry() throws Exception  {
+        // Given configuration with a sink.
+        cfg.sinks().change(c -> c.create(TEST_SINK, s -> {
+            s.changeChannel("some");
+        })).get();
+
+        assertThat(registry.getByName(TEST_SINK), not(nullValue()));
+
+        // And sink can be found by channel.
+        assertThat(registry.findAllByChannel("some"), hasSize(1));
+
+        // When the channel is updated.
+        cfg.sinks().change(c -> c.update(TEST_SINK, s -> {
+            s.changeChannel(TEST_CHANNEL);
+        })).get();
+
+        // Then then the sink can not be found by previous channel.
+        assertThat(registry.findAllByChannel("some"), hasSize(0));
+        // And the sink can be found by new channel.
+        assertThat(registry.findAllByChannel(TEST_CHANNEL), hasSize(1));
+
+        // When add one more sink with the same channel.
+        cfg.sinks().change(c -> c.create("newSink", s -> {
+            s.changeChannel(TEST_CHANNEL);
+        })).get();
+
+        // Then the sink can be found by new channel.
+        assertThat(registry.findAllByChannel(TEST_CHANNEL), hasSize(2));
+    }
+}
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
new file mode 100644
index 0000000000..54f9f0d217
--- /dev/null
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.impl;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.EventChannel;
+import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.apache.ignite.internal.eventlog.api.IgniteEvents;
+import org.apache.ignite.internal.eventlog.api.Sink;
+import org.apache.ignite.internal.eventlog.event.EventUser;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class EventLogTest {
+    private static final EventUser TEST_USER = EventUser.of("testuser", 
"basicAuthenticator");
+    private static final Event TEST_EVENT = 
IgniteEvents.USER_AUTHENTICATED.create(TEST_USER);
+    private static final String TEST_CHANNEL_NAME = "testChannel";
+
+    private EventLog eventLog;
+
+    private TestChannelRegistry channelRegistry;
+    private TestSinkRegistry sinkRegistry;
+    private ChannelFactory channelFactory;
+
+    @BeforeEach
+    void setUp() {
+        channelRegistry = new TestChannelRegistry();
+        sinkRegistry = new TestSinkRegistry();
+        channelFactory = new ChannelFactory(sinkRegistry);
+        eventLog = new EventLogImpl(channelRegistry);
+    }
+
+    @Test
+    void logsEventCorrectly() {
+        // Given no channels and sinks.
+
+        // Then nothing thrown.
+        assertDoesNotThrow(() -> eventLog.log(() -> TEST_EVENT));
+
+        // When add a channel but there is no sink.
+        channelRegistry.register(TEST_CHANNEL_NAME, () -> 
channelFactory.createChannel(
+                TEST_CHANNEL_NAME, Set.of(TEST_EVENT.type()))
+        );
+
+        // Then nothing thrown.
+        assertDoesNotThrow(() -> eventLog.log(() -> TEST_EVENT));
+
+        // When add a sink for the channel.
+        List<Event> container = new ArrayList<>();
+        sinkRegistry.register(TEST_CHANNEL_NAME, container::add);
+
+        // And log event.
+        eventLog.log(() -> TEST_EVENT);
+
+        // Then event is logged.
+        assertThat(container, hasItem(TEST_EVENT));
+
+        // When log event with a type that is not supported by the channel.
+        Event event = IgniteEvents.CONNECTION_CLOSED.create(TEST_USER);
+
+        // Then nothing thrown.
+        assertDoesNotThrow(() -> eventLog.log(() -> event));
+        // And the event is not logged.
+        assertThat(container, not(hasItem(event)));
+    }
+
+    private static class TestChannelRegistry implements ChannelRegistry {
+        private final Map<String, Supplier<EventChannel>> channels;
+
+        private TestChannelRegistry() {
+            channels = new HashMap<>();
+        }
+
+        void register(String name, Supplier<EventChannel> channel) {
+            channels.put(name, channel);
+        }
+
+        @Override
+        public EventChannel getByName(String name) {
+            return channels.get(name).get();
+        }
+
+        @Override
+        public Set<EventChannel> findAllChannelsByEventType(String 
igniteEventType) {
+            return channels.values().stream()
+                    .map(Supplier::get)
+                    .filter(channel -> 
channel.types().contains(igniteEventType))
+                    .collect(HashSet::new, Set::add, Set::addAll);
+        }
+    }
+
+    private static class TestSinkRegistry implements SinkRegistry {
+        private final Map<String, Sink> sinks;
+
+        private TestSinkRegistry() {
+            sinks = new HashMap<>();
+        }
+
+        void register(String name, Sink sink) {
+            sinks.put(name, sink);
+        }
+
+        @Override
+        public Sink getByName(String name) {
+            return sinks.get(name);
+        }
+
+        @Override
+        public Set<Sink> findAllByChannel(String channel) {
+            if (!sinks.containsKey(channel)) {
+                return Set.of();
+            }
+            return Set.of(sinks.get(channel));
+        }
+    }
+}
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/sink/LogSinkTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
similarity index 96%
rename from 
modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/sink/LogSinkTest.java
rename to 
modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
index 7e667b653d..ea6aff7909 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/sink/LogSinkTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.eventlog.sink;
+package org.apache.ignite.internal.eventlog.impl;
 
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -28,10 +28,11 @@ import java.nio.file.Path;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.IgniteEvents;
+import org.apache.ignite.internal.eventlog.api.Sink;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
 import org.apache.ignite.internal.eventlog.config.schema.LogSinkChange;
 import org.apache.ignite.internal.eventlog.event.EventUser;
-import org.apache.ignite.internal.eventlog.event.IgniteEvents;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
index d8096d82ff..9473db1c80 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
@@ -23,15 +23,15 @@ import static 
uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs;
 import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.api.IgniteEvents;
 import org.apache.ignite.internal.eventlog.event.EventUser;
-import org.apache.ignite.internal.eventlog.event.IgniteEvents;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 class JsonEventSerializerTest {
-    public static Stream<Arguments> events() {
+    private static Stream<Arguments> events() {
         return Stream.of(
                 Arguments.of(
                         IgniteEvents.CONNECTION_CLOSED.builder()
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b4ca5bf0b2..cbb8a66b03 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -110,6 +110,8 @@ import 
org.apache.ignite.internal.deployunit.IgniteDeployment;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.impl.EventLogImpl;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
@@ -907,8 +909,10 @@ public class IgniteImpl implements Ignite {
     private AuthenticationManager createAuthenticationManager() {
         SecurityConfiguration securityConfiguration = 
clusterCfgMgr.configurationRegistry()
                 .getConfiguration(SecurityConfiguration.KEY);
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-21665
-        return new AuthenticationManagerImpl(securityConfiguration, (ign) -> 
{});
+        EventLogConfiguration eventLogConfiguration = 
clusterCfgMgr.configurationRegistry()
+                .getConfiguration(EventLogConfiguration.KEY);
+
+        return new AuthenticationManagerImpl(securityConfiguration, new 
EventLogImpl(eventLogConfiguration));
     }
 
     private RestComponent createRestComponent(String name) {
diff --git 
a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java
 
b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java
index c5faec9cfc..7997c1ea12 100644
--- 
a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java
+++ 
b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java
@@ -32,8 +32,8 @@ import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.notifications.ConfigurationListener;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.apache.ignite.internal.eventlog.api.IgniteEvents;
 import org.apache.ignite.internal.eventlog.event.EventUser;
-import org.apache.ignite.internal.eventlog.event.IgniteEvents;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.security.authentication.basic.BasicAuthenticationProviderConfiguration;

Reply via email to