porcelli commented on code in PR #6603: URL: https://github.com/apache/incubator-kie-drools/pull/6603#discussion_r2879327221
########## drools-audit/src/main/java/org/drools/audit/AuditTrailConfiguration.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.drools.audit; + +import jakarta.persistence.EntityManagerFactory; Review Comment: Hard import of EntityManagerFactory. This class will fail to load at runtime without JPA on the classpath, even for in-memory-only users. Consider isolating JPA into a separate builder class. ########## drools-audit/src/main/java/org/drools/audit/store/InMemoryAuditStore.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.drools.audit.store; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.drools.audit.event.AuditEvent; +import org.drools.audit.event.AuditEventType; +import org.drools.audit.event.RuleFiredEvent; + +/** + * Thread-safe, bounded in-memory audit store suitable for development, + * testing, and short-lived sessions. Events exceeding {@code maxCapacity} + * cause the oldest entries to be evicted. + */ +public class InMemoryAuditStore implements AuditStore { + + private static final int DEFAULT_MAX_CAPACITY = 100_000; + + private static final Comparator<AuditEvent> TIMESTAMP_ORDER = + Comparator.comparing(AuditEvent::getTimestamp) + .thenComparing(AuditEvent::getSessionId) + .thenComparingLong(AuditEvent::getSequenceNumber); + + private final ConcurrentLinkedDeque<AuditEvent> events = new ConcurrentLinkedDeque<>(); + private final AtomicInteger size = new AtomicInteger(0); + private final int maxCapacity; + + public InMemoryAuditStore() { + this(DEFAULT_MAX_CAPACITY); + } + + public InMemoryAuditStore(int maxCapacity) { + if (maxCapacity <= 0) { + throw new IllegalArgumentException("maxCapacity must be positive"); + } + this.maxCapacity = maxCapacity; + } + + @Override + public void store(AuditEvent event) { + events.addLast(event); + int currentSize = size.incrementAndGet(); + while (currentSize > maxCapacity) { + if (events.pollFirst() != null) { + currentSize = size.decrementAndGet(); + } else { + break; + } + } + } + + @Override + public List<AuditEvent> findBySessionId(String sessionId) { + return events.stream() + .filter(e -> e.getSessionId().equals(sessionId)) + .sorted() + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findBySessionIdAndType(String sessionId, AuditEventType type) { + return events.stream() + .filter(e -> e.getSessionId().equals(sessionId) && e.getType() == type) + .sorted() + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findByTimeRange(Instant from, Instant to) { + return events.stream() + .filter(e -> !e.getTimestamp().isBefore(from) && !e.getTimestamp().isAfter(to)) + .sorted(TIMESTAMP_ORDER) + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findByRuleName(String ruleName) { + return events.stream() + .filter(e -> e instanceof RuleFiredEvent) + .map(e -> (RuleFiredEvent) e) + .filter(e -> ruleName.equals(e.getRuleName())) + .sorted() + .collect(Collectors.toList()); + } Review Comment: The two stores (this and JPA) seems to return different results for the same findByRuleName() call. ########## drools-audit/src/main/java/org/drools/audit/listener/AuditEventListener.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.drools.audit.listener; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.drools.audit.event.AgendaOperationEvent; +import org.drools.audit.event.AuditEventType; +import org.drools.audit.event.FactOperationEvent; +import org.drools.audit.event.RuleFiredEvent; +import org.drools.audit.store.AuditStore; +import org.drools.core.common.InternalFactHandle; +import org.kie.api.event.rule.AfterMatchFiredEvent; +import org.kie.api.event.rule.AgendaGroupPoppedEvent; +import org.kie.api.event.rule.AgendaGroupPushedEvent; +import org.kie.api.event.rule.BeforeMatchFiredEvent; +import org.kie.api.event.rule.DefaultAgendaEventListener; +import org.kie.api.event.rule.DefaultRuleRuntimeEventListener; +import org.kie.api.event.rule.MatchCancelledEvent; +import org.kie.api.event.rule.MatchCreatedEvent; +import org.kie.api.event.rule.ObjectDeletedEvent; +import org.kie.api.event.rule.ObjectInsertedEvent; +import org.kie.api.event.rule.ObjectUpdatedEvent; +import org.kie.api.event.rule.RuleFlowGroupActivatedEvent; +import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.rule.FactHandle; +import org.kie.api.runtime.rule.Match; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Composite listener that attaches to a KieSession as both an + * {@link org.kie.api.event.rule.AgendaEventListener} and a + * {@link org.kie.api.event.rule.RuleRuntimeEventListener}, recording + * every rule evaluation and fact mutation into an {@link AuditStore}. + * + * <p>Thread-safe: the sequence counter is atomic and the store + * implementation is responsible for its own concurrency guarantees.</p> + */ +public class AuditEventListener { + + private static final Logger LOG = LoggerFactory.getLogger(AuditEventListener.class); + + private final AuditStore store; + private final String sessionId; + private final AtomicLong sequenceCounter = new AtomicLong(0); + private final AgendaListener agendaListener = new AgendaListener(); + private final RuntimeListener runtimeListener = new RuntimeListener(); + + public AuditEventListener(AuditStore store, String sessionId) { + this.store = store; + this.sessionId = sessionId; + } + + /** + * Registers both listener facets on the given session. + */ + public void attach(KieSession session) { + session.addEventListener(agendaListener); + session.addEventListener(runtimeListener); + LOG.debug("Audit listener attached to session {}", sessionId); + } + + /** + * Removes both listener facets from the given session. + */ + public void detach(KieSession session) { + session.removeEventListener(agendaListener); + session.removeEventListener(runtimeListener); + LOG.debug("Audit listener detached from session {}", sessionId); + } + + public AgendaListener getAgendaListener() { + return agendaListener; + } + + public RuntimeListener getRuntimeListener() { + return runtimeListener; + } + + public String getSessionId() { + return sessionId; + } + + private long nextSeq() { + return sequenceCounter.incrementAndGet(); + } + + private Map<String, String> extractDeclarations(Match match) { + Map<String, String> decls = new LinkedHashMap<>(); + for (String declId : match.getDeclarationIds()) { + Object value = match.getDeclarationValue(declId); + decls.put(declId, value != null ? value.toString() : "null"); + } + return decls; + } + + private List<Long> extractFactHandleIds(Match match) { + List<Long> ids = new ArrayList<>(); + for (FactHandle fh : match.getFactHandles()) { + if (fh instanceof InternalFactHandle) { + ids.add(((InternalFactHandle) fh).getId()); + } + } + return ids; + } + + private String ruleName(org.kie.api.definition.rule.Rule rule) { + return rule != null ? rule.getName() : null; + } + + /** + * Inner listener for agenda (rule activation) events. + */ + public class AgendaListener extends DefaultAgendaEventListener { + + @Override + public void matchCreated(MatchCreatedEvent event) { + RuleFiredEvent auditEvent = new RuleFiredEvent( + AuditEventType.RULE_MATCH_CREATED, + sessionId, nextSeq(), + event.getMatch().getRule().getName(), + event.getMatch().getRule().getPackageName(), + extractDeclarations(event.getMatch()), + extractFactHandleIds(event.getMatch()), + event.getMatch().getSalience()); + store.store(auditEvent); + } + + @Override + public void matchCancelled(MatchCancelledEvent event) { + RuleFiredEvent auditEvent = new RuleFiredEvent( + AuditEventType.RULE_MATCH_CANCELLED, + sessionId, nextSeq(), + event.getMatch().getRule().getName(), + event.getMatch().getRule().getPackageName(), + extractDeclarations(event.getMatch()), + extractFactHandleIds(event.getMatch()), + event.getMatch().getSalience()); + store.store(auditEvent); + } + + @Override + public void beforeMatchFired(BeforeMatchFiredEvent event) { + // captured for timing; rule-fired is recorded on afterMatchFired + } + + @Override + public void afterMatchFired(AfterMatchFiredEvent event) { + RuleFiredEvent auditEvent = new RuleFiredEvent( + AuditEventType.RULE_FIRED, + sessionId, nextSeq(), + event.getMatch().getRule().getName(), + event.getMatch().getRule().getPackageName(), + extractDeclarations(event.getMatch()), + extractFactHandleIds(event.getMatch()), + event.getMatch().getSalience()); + store.store(auditEvent); + LOG.trace("Audit: rule fired [{}] in session {}", event.getMatch().getRule().getName(), sessionId); + } + + @Override + public void agendaGroupPushed(AgendaGroupPushedEvent event) { + store.store(new AgendaOperationEvent( + AuditEventType.AGENDA_GROUP_PUSHED, + sessionId, nextSeq(), + event.getAgendaGroup().getName())); + } + + @Override + public void agendaGroupPopped(AgendaGroupPoppedEvent event) { + store.store(new AgendaOperationEvent( + AuditEventType.AGENDA_GROUP_POPPED, + sessionId, nextSeq(), + event.getAgendaGroup().getName())); + } + + @Override + public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) { + store.store(new AgendaOperationEvent( + AuditEventType.RULEFLOW_GROUP_ACTIVATED, + sessionId, nextSeq(), + event.getRuleFlowGroup().getName())); + } + + @Override + public void afterRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) { + store.store(new AgendaOperationEvent( + AuditEventType.RULEFLOW_GROUP_DEACTIVATED, + sessionId, nextSeq(), + event.getRuleFlowGroup().getName())); + } + } + + /** + * Inner listener for fact lifecycle (working memory) events. + */ + public class RuntimeListener extends DefaultRuleRuntimeEventListener { + + @Override + public void objectInserted(ObjectInsertedEvent event) { + Object obj = event.getObject(); Review Comment: maybe check null? here and other parts of the the code dealing with the event ########## drools-audit/src/main/java/org/drools/audit/event/AuditEvent.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.drools.audit.event; + +import java.io.Serializable; +import java.time.Instant; +import java.util.Objects; +import java.util.UUID; + +/** + * Immutable base class for all audit trail events. + * Each event captures a single atomic operation within the rule engine, + * tagged with a correlation ID for session-level grouping and a monotonically + * increasing sequence number for total ordering within a session. + */ +public abstract class AuditEvent implements Serializable, Comparable<AuditEvent> { + + private static final long serialVersionUID = 1L; + + private final String id; + private final AuditEventType type; + private final Instant timestamp; + private final String sessionId; + private final long sequenceNumber; + + protected AuditEvent(AuditEventType type, String sessionId, long sequenceNumber) { + this.id = UUID.randomUUID().toString(); + this.type = Objects.requireNonNull(type, "type"); + this.timestamp = Instant.now(); + this.sessionId = Objects.requireNonNull(sessionId, "sessionId"); + this.sequenceNumber = sequenceNumber; + } + + /** + * Hydration constructor for reconstructing a persisted event + * with its original identity and timestamp. + */ + protected AuditEvent(String id, AuditEventType type, Instant timestamp, + String sessionId, long sequenceNumber) { + this.id = Objects.requireNonNull(id, "id"); + this.type = Objects.requireNonNull(type, "type"); + this.timestamp = Objects.requireNonNull(timestamp, "timestamp"); + this.sessionId = Objects.requireNonNull(sessionId, "sessionId"); + this.sequenceNumber = sequenceNumber; + } + + public String getId() { + return id; + } + + public AuditEventType getType() { + return type; + } + + public Instant getTimestamp() { + return timestamp; + } + + public String getSessionId() { + return sessionId; + } + + public long getSequenceNumber() { + return sequenceNumber; + } + + @Override + public int compareTo(AuditEvent other) { + int cmp = this.sessionId.compareTo(other.sessionId); + if (cmp != 0) { + return cmp; + } + return Long.compare(this.sequenceNumber, other.sequenceNumber); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof AuditEvent)) return false; + AuditEvent that = (AuditEvent) o; + return Objects.equals(id, that.id); Review Comment: I think this is breaking Comparable contract ########## drools-audit/src/main/java/org/drools/audit/store/InMemoryAuditStore.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.drools.audit.store; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.drools.audit.event.AuditEvent; +import org.drools.audit.event.AuditEventType; +import org.drools.audit.event.RuleFiredEvent; + +/** + * Thread-safe, bounded in-memory audit store suitable for development, + * testing, and short-lived sessions. Events exceeding {@code maxCapacity} + * cause the oldest entries to be evicted. + */ +public class InMemoryAuditStore implements AuditStore { + + private static final int DEFAULT_MAX_CAPACITY = 100_000; + + private static final Comparator<AuditEvent> TIMESTAMP_ORDER = + Comparator.comparing(AuditEvent::getTimestamp) + .thenComparing(AuditEvent::getSessionId) + .thenComparingLong(AuditEvent::getSequenceNumber); + + private final ConcurrentLinkedDeque<AuditEvent> events = new ConcurrentLinkedDeque<>(); + private final AtomicInteger size = new AtomicInteger(0); + private final int maxCapacity; + + public InMemoryAuditStore() { + this(DEFAULT_MAX_CAPACITY); + } + + public InMemoryAuditStore(int maxCapacity) { + if (maxCapacity <= 0) { + throw new IllegalArgumentException("maxCapacity must be positive"); + } + this.maxCapacity = maxCapacity; + } + + @Override + public void store(AuditEvent event) { + events.addLast(event); + int currentSize = size.incrementAndGet(); + while (currentSize > maxCapacity) { + if (events.pollFirst() != null) { + currentSize = size.decrementAndGet(); + } else { + break; + } + } + } + + @Override + public List<AuditEvent> findBySessionId(String sessionId) { + return events.stream() + .filter(e -> e.getSessionId().equals(sessionId)) + .sorted() + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findBySessionIdAndType(String sessionId, AuditEventType type) { + return events.stream() + .filter(e -> e.getSessionId().equals(sessionId) && e.getType() == type) + .sorted() + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findByTimeRange(Instant from, Instant to) { + return events.stream() + .filter(e -> !e.getTimestamp().isBefore(from) && !e.getTimestamp().isAfter(to)) + .sorted(TIMESTAMP_ORDER) + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findByRuleName(String ruleName) { + return events.stream() + .filter(e -> e instanceof RuleFiredEvent) + .map(e -> (RuleFiredEvent) e) + .filter(e -> ruleName.equals(e.getRuleName())) + .sorted() + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findBySessionIdAndTimeRange(String sessionId, Instant from, Instant to) { + return events.stream() + .filter(e -> e.getSessionId().equals(sessionId) + && !e.getTimestamp().isBefore(from) + && !e.getTimestamp().isAfter(to)) + .sorted(TIMESTAMP_ORDER) + .collect(Collectors.toList()); + } + + @Override + public List<AuditEvent> findAll() { + List<AuditEvent> snapshot = new ArrayList<>(events); + Collections.sort(snapshot); + return snapshot; + } + + @Override + public long count() { + return size.get(); + } + + @Override + public long countBySessionId(String sessionId) { + return events.stream() + .filter(e -> e.getSessionId().equals(sessionId)) + .count(); + } + + @Override + public void deleteBySessionId(String sessionId) { + events.removeIf(e -> { + if (e.getSessionId().equals(sessionId)) { + size.decrementAndGet(); + return true; + } + return false; Review Comment: size.decrementAndGet() inside removeIf() not atomic with the deque removal. ########## drools-audit/src/main/java/org/drools/audit/event/AuditEventType.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.drools.audit.event; + +public enum AuditEventType { + + RULE_FIRED("RULE_FIRED", "rule"), + RULE_MATCH_CREATED("RULE_MATCH_CREATED", "rule"), + RULE_MATCH_CANCELLED("RULE_MATCH_CANCELLED", "rule"), + + FACT_INSERTED("FACT_INSERTED", "fact"), + FACT_UPDATED("FACT_UPDATED", "fact"), + FACT_DELETED("FACT_DELETED", "fact"), + + AGENDA_GROUP_PUSHED("AGENDA_GROUP_PUSHED", "agenda"), + AGENDA_GROUP_POPPED("AGENDA_GROUP_POPPED", "agenda"), + RULEFLOW_GROUP_ACTIVATED("RULEFLOW_GROUP_ACTIVATED", "agenda"), + RULEFLOW_GROUP_DEACTIVATED("RULEFLOW_GROUP_DEACTIVATED", "agenda"), + + SESSION_CREATED("SESSION_CREATED", "session"), + SESSION_DISPOSED("SESSION_DISPOSED", "session"), + SESSION_FIRE_ALL_RULES("SESSION_FIRE_ALL_RULES", "session"); Review Comment: this is only used for test.. is this correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
