Copilot commented on code in PR #10096: URL: https://github.com/apache/ozone/pull/10096#discussion_r3166418342
########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); Review Comment: Logging the full `kafkaProps` can leak sensitive values (eg SASL passwords, tokens). Avoid logging the entire properties map at INFO; log only non-sensitive keys (or redact known sensitive ones) and consider lowering to DEBUG. ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class which can be used by implementations of + * OMEventListener which uses a background service to read the latest + * completed operations and hand them to a callback method. + */ +public class OMEventListenerLedgerPoller extends BackgroundService { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPoller.class); + + private static final int MAX_RESULTS = 10_000; + + private final AtomicBoolean suspended; + private final AtomicLong runCount; + private final AtomicLong successRunCount; + private final OMEventListenerPluginContext pluginContext; + private final OMEventListenerLedgerPollerSeekPosition seekPosition; + private final Consumer<OmCompletedRequestInfo> callback; + + @SuppressWarnings("checkstyle:ParameterNumber") + public OMEventListenerLedgerPoller(long interval, TimeUnit unit, + int poolSize, long serviceTimeout, + OMEventListenerPluginContext pluginContext, + OzoneConfiguration configuration, + OMEventListenerLedgerPollerSeekPosition seekPosition, + Consumer<OmCompletedRequestInfo> callback) { + + super("OMEventListenerLedgerPoller", + interval, + TimeUnit.MILLISECONDS, + poolSize, + serviceTimeout, pluginContext.getThreadNamePrefix()); + + this.suspended = new AtomicBoolean(false); + this.runCount = new AtomicLong(0); + this.successRunCount = new AtomicLong(0); + this.pluginContext = pluginContext; + this.seekPosition = seekPosition; + this.callback = callback; + } + + private boolean shouldRun() { + return pluginContext.isLeaderReady() && !suspended.get(); + } + + /** + * Suspend the service. + */ + @VisibleForTesting + public void suspend() { + suspended.set(true); + } + + /** + * Resume the service if suspended. + */ + @VisibleForTesting + public void resume() { + suspended.set(false); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new OMEventListenerLedgerPoller.CompletedRequestInfoConsumerTask()); + return queue; + } + + public AtomicLong getRunCount() { + return runCount; + } + + private class CompletedRequestInfoConsumerTask implements BackgroundTask { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() { + if (shouldRun()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running OMEventListenerLedgerPoller"); + } + if (runCount.get() == 0) { + seekPosition.initSeekPosition(); Review Comment: `seekPosition.initSeekPosition()` is called on the first run but its return value is ignored, and `OMEventListenerLedgerPollerSeekPosition#initSeekPosition()` does not update internal state. As written, this first-run init is a no-op; either have `initSeekPosition()` update the AtomicReference, or call `seekPosition.set(...)` with the loaded value. ```suggestion seekPosition.set(seekPosition.initSeekPosition()); ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class to get/set the seek position used by the + * OMEventListenerLedgerPoller. + * + * XXX: the seek position should be persisted (and ideally distrbuted to Review Comment: Typo in comment: "distrbuted" -> "distributed". ```suggestion * XXX: the seek position should be persisted (and ideally distributed to ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List<OMEventListener> plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List<OMEventListener> getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List<OMEventListener> plugins = new ArrayList<>(); + + Map<String, String> props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List<String> destNameList = new ArrayList<>(); + for (Map.Entry<String, String> entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class<? extends OMEventListener> cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); + } + } + + return plugins; + } + + private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf, + String destName) { + String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; Review Comment: `classnameProp` is built by concatenation without a `.` between `PLUGIN_DEST_BASE` and `destName`, which currently only works because `destName` includes a leading dot. If the prefix handling changes, this will silently generate the wrong property key (eg `...destinationfoo.classname`). Prefer constructing the property as `PLUGIN_DEST_BASE + '.' + destName + ".classname"` (or using a base prefix that already ends with `.`). ```suggestion String normalizedDestName = destName.startsWith(".") ? destName : "." + destName; String classnameProp = PLUGIN_DEST_BASE + normalizedDestName + ".classname"; ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } Review Comment: `start()` starts the ledger poller before the Kafka client is initialized. If polling begins first, events can be processed while the producer is still null and later state (seek position) may be advanced incorrectly. Initialize Kafka first (and only start polling after it succeeds). ```suggestion try { kafkaClient.initialize(); } catch (IOException ex) { LOG.error("Failure initializing kafka client", ex); return; } ledgerPoller.start(); ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java: ########## @@ -1340,6 +1340,58 @@ private List<OmVolumeArgs> listAllVolumes(String prefix, String startKey, return result; } + /** + * {@inheritDoc} + */ + @Override + public List<OmCompletedRequestInfo> listCompletedRequestInfo(final String startKey, + final int maxResults) + throws IOException { + List<OmCompletedRequestInfo> results = new ArrayList<>(); + + Table.KeyValue<Long, OmCompletedRequestInfo> completedRequestInfoRow; + try (TableIterator<Long, ? extends Table.KeyValue<Long, OmCompletedRequestInfo>> + tableIterator = getCompletedRequestInfoTable().iterator()) { + + boolean skipFirst = false; + if (StringUtils.isNotBlank(startKey)) { + // TODO: what happens if the seek position is no longer + // available? Do we go to the end of the list + // or the first key > startKey + tableIterator.seek(Long.valueOf(startKey)); + skipFirst = true; + } + + while (tableIterator.hasNext() && results.size() < maxResults) { + completedRequestInfoRow = tableIterator.next(); + // this is the first loop iteration after the seek so we + // need to skip the record we seeked to (if it is still + // present) + if (skipFirst) { + skipFirst = false; + // NOTE: I'm assuming that we need to conditionally do this + // only if it is equal to what we wanted to seek to (hence + if (!Objects.equals(startKey, completedRequestInfoRow.getKey())) { + // when we have a startKey we expect the first result Review Comment: `startKey` is a `String` but `completedRequestInfoRow.getKey()` is a `Long`; `Objects.equals(startKey, completedRequestInfoRow.getKey())` will always be false and will always throw when `startKey` is provided. Compare like types (eg parse `startKey` to `Long` once) before treating the row as missing. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List<OMEventListener> plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List<OMEventListener> getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List<OMEventListener> plugins = new ArrayList<>(); + + Map<String, String> props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List<String> destNameList = new ArrayList<>(); Review Comment: `getPropsMatchPrefixAndTrimPrefix` is typically called with a prefix ending in `.` so trimmed keys don't start with a leading dot (eg `RatisHelper#getDatanodeRatisPrefixProps` uses `prefix + '.'`). Using `PLUGIN_DEST_BASE` without the dot makes the trimmed destination names start with `.`, and the rest of this class relies on that implicit formatting. Consider changing `PLUGIN_DEST_BASE` to include the trailing dot and updating downstream key construction accordingly. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List<OMEventListener> plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List<OMEventListener> getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List<OMEventListener> plugins = new ArrayList<>(); + + Map<String, String> props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List<String> destNameList = new ArrayList<>(); + for (Map.Entry<String, String> entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class<? extends OMEventListener> cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); + } + } + + return plugins; + } + + private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf, + String destName) { + String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; + LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); Review Comment: Typo in log message: "Gettting" -> "Getting". ```suggestion LOG.info("Getting classname for {} with property {}", destName, classnameProp); ``` ########## hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerKafkaPublisher { + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + @Mock + private OMEventListenerPluginContext pluginContext; + + // helper to create json key/val string for non exhaustive JSON + // attribute checking + private static String toJsonKeyVal(String key, String val) { + return new StringBuilder() + .append('\"') + .append(key) + .append('\"') + .append(':') + .append('\"') + .append(val) + .append('\"') + .toString(); + } + + private static OmCompletedRequestInfo buildCompletedRequestInfo( + long trxLogIndex, Type cmdType, String keyName, OperationArgs opArgs) { + + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(trxLogIndex) + .setCmdType(cmdType) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setCreationTime(Time.now()) + .setOpArgs(new OperationArgs.NoArgs()) + .build(); + } + + private List<String> captureEventsProducedByOperation(OmCompletedRequestInfo op, int expectEvents) + throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.notify.kafka.topic", "abc"); + + List<String> events = new ArrayList<>(); + + OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher(); + try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper = + mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { + + plugin.initialize(conf, pluginContext); + plugin.handleCompletedRequest(op); + + OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); Review Comment: Typo in variable name `mockeKafkaClientWrapper` (missing `d`). Consider renaming to `mockedKafkaClientWrapper` for clarity. ```suggestion try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockedKafkaClientWrapper = mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { plugin.initialize(conf, pluginContext); plugin.handleCompletedRequest(op); OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockedKafkaClientWrapper.constructed().get(0); ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + + // we can update the seek position + seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex())); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer<String, String> producer; + + KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + + ensureTopicExists(); + } + + public void shutdown() throws IOException { + producer.close(); + } + + public void send(String message) throws IOException { + if (producer != null) { + LOG.info("Producing event {}", message); + ProducerRecord<String, String> producerRecord = + new ProducerRecord<>(topic, message); + producer.send(producerRecord); + } else { + LOG.warn("Producing event {} [KAFKA DOWN]", message); Review Comment: If `producer == null`, this method only logs a warning and returns, which makes upstream code think the send succeeded (and it may still advance the seek position). Return a failure signal or throw so callers can retry and avoid committing state on failed sends. ```suggestion LOG.warn("Producing event {} [KAFKA DOWN]", message); throw new IOException("Kafka producer is not initialized"); ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); Review Comment: Building JSON via `String.format` without escaping can emit invalid JSON if values contain quotes/backslashes/control characters. Use a JSON serializer (eg Jackson) to build the payload safely. ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { Review Comment: The `send` error handling only catches `IOException`, but Kafka send failures are typically runtime exceptions or async failures (from the returned future). Catch/handle the relevant exceptions and avoid advancing the seek position unless the send is confirmed successful. ```suggestion } catch (IOException | RuntimeException ex) { ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List<OMEventListener> plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List<OMEventListener> getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List<OMEventListener> plugins = new ArrayList<>(); + + Map<String, String> props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List<String> destNameList = new ArrayList<>(); + for (Map.Entry<String, String> entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class<? extends OMEventListener> cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); + } + } + + return plugins; + } + + private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf, + String destName) { + String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; + LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); Review Comment: Typo in log message: "propety" -> "property". ```suggestion LOG.info("Gettting classname for {} with property {}", destName, classnameProp); ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + + // we can update the seek position + seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex())); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer<String, String> producer; + + KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + + ensureTopicExists(); + } + + public void shutdown() throws IOException { + producer.close(); + } + + public void send(String message) throws IOException { + if (producer != null) { + LOG.info("Producing event {}", message); + ProducerRecord<String, String> producerRecord = + new ProducerRecord<>(topic, message); + producer.send(producerRecord); + } else { + LOG.warn("Producing event {} [KAFKA DOWN]", message); + } + } + + private void ensureTopicExists() { + try (AdminClient adminClient = AdminClient.create(kafkaProps)) { + LOG.info("Creating kafka topic: {}", this.topic); + NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + adminClient.close(); + } catch (Exception ex) { + LOG.error("Failed to create topic: {}", this.topic, ex); Review Comment: `ensureTopicExists()` always attempts topic creation and logs an error for expected cases like “topic already exists”, which can spam logs on every OM start. Handle `TopicExistsException` (or equivalent) as a non-error and only log unexpected failures at ERROR. ########## hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerKafkaPublisher { + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + @Mock + private OMEventListenerPluginContext pluginContext; + + // helper to create json key/val string for non exhaustive JSON + // attribute checking + private static String toJsonKeyVal(String key, String val) { + return new StringBuilder() + .append('\"') + .append(key) + .append('\"') + .append(':') + .append('\"') + .append(val) + .append('\"') + .toString(); + } + + private static OmCompletedRequestInfo buildCompletedRequestInfo( + long trxLogIndex, Type cmdType, String keyName, OperationArgs opArgs) { + + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(trxLogIndex) + .setCmdType(cmdType) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setCreationTime(Time.now()) + .setOpArgs(new OperationArgs.NoArgs()) Review Comment: `buildCompletedRequestInfo(..., OperationArgs opArgs)` ignores the `opArgs` parameter and always sets `NoArgs`, so tests that pass `CreateFileArgs`/`RenameKeyArgs` don’t actually exercise those cases. Use the provided `opArgs` when building the request info. ```suggestion .setOpArgs(opArgs) ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + + // we can update the seek position + seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex())); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer<String, String> producer; + + KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + Review Comment: `KafkaClientWrapper.initialize()` logs the full properties set, which may include credentials. Please redact/omit sensitive config values from logs. ########## hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerPluginManager { + + @Mock + private OzoneManager ozoneManager; + + static List<String> getLoadedPlugins(OMEventListenerPluginManager pluginManager) { + List<String> loadedClasses = new ArrayList<>(); + for (OMEventListener plugin : pluginManager.getLoaded()) { + loadedClasses.add(plugin.getClass().getName()); + } + + // normalize + Collections.sort(loadedClasses); + + return loadedClasses; + } + + private static class BrokenFooPlugin { + + } Review Comment: `BrokenFooPlugin` is declared as a nested class, so its binary name is `TestOMEventListenerPluginManager$BrokenFooPlugin`. Using `org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin` in the config will not resolve, so this test doesn’t actually cover the “class exists but doesn’t implement OMEventListener” case. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List<OMEventListener> plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } Review Comment: This manager is never instantiated from OM production code (only referenced in tests), so configured `ozone.om.plugin.destination.*` listeners will not actually be loaded/started at runtime. If the PR intends Kafka publishing to be functional, wire `OMEventListenerPluginManager` into `OzoneManager` startup/shutdown (or document the intended entry point). ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { Review Comment: `producer.send(...)` is asynchronous; failures may surface after this call returns. Since this method advances the seek position later, consider waiting for the send to complete successfully (or using a callback) before treating the event as committed. ########## hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. Review Comment: Javadoc references the wrong class: this test suite targets `OMEventListenerKafkaPublisher`, not `OMEventListenerPluginManager`. Update the link to avoid confusion. ```suggestion * Tests {@link OMEventListenerKafkaPublisher}. ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + + // we can update the seek position + seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex())); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer<String, String> producer; + + KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + + ensureTopicExists(); + } + + public void shutdown() throws IOException { + producer.close(); Review Comment: `KafkaClientWrapper.shutdown()` calls `producer.close()` without a null check. If initialization failed (or `start()` was never called), this can throw NPE and is not caught by the caller (it only catches `IOException`). Guard `producer` and make shutdown idempotent. ```suggestion KafkaProducer<String, String> currentProducer = producer; producer = null; if (currentProducer != null) { currentProducer.close(); } ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List<OMEventListener> plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List<OMEventListener> getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List<OMEventListener> loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List<OMEventListener> plugins = new ArrayList<>(); + + Map<String, String> props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List<String> destNameList = new ArrayList<>(); + for (Map.Entry<String, String> entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class<? extends OMEventListener> cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); Review Comment: `Class#newInstance()` is deprecated and can mask constructor exceptions. Prefer `cls.getDeclaredConstructor().newInstance()` and handle the reflective exceptions explicitly. ```suggestion OMEventListener impl = cls.getDeclaredConstructor().newInstance(); impl.initialize(conf, pluginContext); plugins.add(impl); } catch (ReflectiveOperationException ex) { LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); } catch (Exception ex) { LOG.error("Can't initialize event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); ``` ########## hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class which can be used by implementations of + * OMEventListener which uses a background service to read the latest + * completed operations and hand them to a callback method. + */ +public class OMEventListenerLedgerPoller extends BackgroundService { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPoller.class); + + private static final int MAX_RESULTS = 10_000; + + private final AtomicBoolean suspended; + private final AtomicLong runCount; + private final AtomicLong successRunCount; + private final OMEventListenerPluginContext pluginContext; + private final OMEventListenerLedgerPollerSeekPosition seekPosition; + private final Consumer<OmCompletedRequestInfo> callback; + + @SuppressWarnings("checkstyle:ParameterNumber") + public OMEventListenerLedgerPoller(long interval, TimeUnit unit, + int poolSize, long serviceTimeout, + OMEventListenerPluginContext pluginContext, + OzoneConfiguration configuration, + OMEventListenerLedgerPollerSeekPosition seekPosition, + Consumer<OmCompletedRequestInfo> callback) { + + super("OMEventListenerLedgerPoller", + interval, + TimeUnit.MILLISECONDS, Review Comment: The constructor takes a `TimeUnit unit` but the `BackgroundService` super constructor is called with `TimeUnit.MILLISECONDS`, ignoring the passed unit. Pass `unit` through (and convert `interval` appropriately) or remove the `unit` parameter to avoid incorrect scheduling. ```suggestion unit, ``` ########## hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerKafkaPublisher { + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + @Mock + private OMEventListenerPluginContext pluginContext; + + // helper to create json key/val string for non exhaustive JSON + // attribute checking + private static String toJsonKeyVal(String key, String val) { + return new StringBuilder() + .append('\"') + .append(key) + .append('\"') + .append(':') + .append('\"') + .append(val) + .append('\"') + .toString(); + } + + private static OmCompletedRequestInfo buildCompletedRequestInfo( + long trxLogIndex, Type cmdType, String keyName, OperationArgs opArgs) { + + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(trxLogIndex) + .setCmdType(cmdType) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setCreationTime(Time.now()) + .setOpArgs(new OperationArgs.NoArgs()) + .build(); + } + + private List<String> captureEventsProducedByOperation(OmCompletedRequestInfo op, int expectEvents) + throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.notify.kafka.topic", "abc"); + + List<String> events = new ArrayList<>(); + + OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher(); + try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper = + mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { + + plugin.initialize(conf, pluginContext); + plugin.handleCompletedRequest(op); + + OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); + ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); + verify(mock, times(expectEvents)).send(argument.capture()); + + events.addAll(argument.getAllValues()); + } + + return events; + } + + @Test + public void testCreateKeyRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(1L, Type.CreateKey, "some/key1", + new OperationArgs.NoArgs()); + + List<String> events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1")) + .contains(toJsonKeyVal("type", "CreateKey")); + } + + @Test + public void testCreateFileRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + boolean recursive = false; + boolean overwrite = true; + + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(2L, Type.CreateFile, "some/key2", + new OperationArgs.CreateFileArgs(recursive, overwrite)); + + List<String> events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key2")) + .contains(toJsonKeyVal("type", "CreateFile")); + } + + @Test + public void testCreateDirectoryRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(3L, Type.CreateDirectory, "some/key3", + new OperationArgs.NoArgs()); + + List<String> events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key3")) + .contains(toJsonKeyVal("type", "CreateDirectory")); + } + + @Test + public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException { Review Comment: Test name mentions producing both create and delete events, but the assertions only validate a single event. Rename the test (or adjust expectations) so it reflects the behavior being verified. ```suggestion public void testRenameRequestProducesRenameKeyEvent() throws InterruptedException, IOException { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
