jsancio commented on code in PR #12160:
URL: https://github.com/apache/kafka/pull/12160#discussion_r876160871


##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {
-        changes.put(record.id(), Optional.empty());
+        if (image.acls().containsKey(record.id())) {
+            changes.put(record.id(), Optional.empty());
+        } else if (changes.containsKey(record.id())) {
+            changes.remove(record.id());

Review Comment:
   Btw, the Jira mentions that we don't have "remove acl" to be idempotent. 
"Remove acl" is idempotent if they are process in the same batch/delta.



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {
+
+    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+
+    @Test
+    public void testRemovesDeleteIfNotInImage() {
+        AclsImage image = new AclsImage(Collections.emptyMap());
+        AclsDelta delta = new AclsDelta(image);
+        AccessControlEntryRecord inputAclRecord = 
testAccessControlEntryRecord();
+
+        assertEquals(0, delta.changes().size());
+
+        delta.replay(inputAclRecord);
+        assertTrue(delta.changes().containsKey(aclId));
+        assertTrue(delta.changes().get(aclId).isPresent());

Review Comment:
   I think you can remove this since you are implicitly testing this in the 
line below.



##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {
-        changes.put(record.id(), Optional.empty());
+        if (image.acls().containsKey(record.id())) {
+            changes.put(record.id(), Optional.empty());
+        } else if (changes.containsKey(record.id())) {
+            changes.remove(record.id());
+        } else {
+            throw new RuntimeException("Failed to find existing ACL with ID " 
+ record.id() + " in either image or changes");

Review Comment:
   Can we throw an `IllegalStateException`?



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {

Review Comment:
   I am okay moving these test to `AclImageTest`. The two types are closely 
related. We already have "delta" tests in the "image" suite.



##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {

Review Comment:
   Can we document this method? Seems subtle enough to warrant documentation.



##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {
-        changes.put(record.id(), Optional.empty());
+        if (image.acls().containsKey(record.id())) {
+            changes.put(record.id(), Optional.empty());
+        } else if (changes.containsKey(record.id())) {
+            changes.remove(record.id());
+        } else {
+            throw new RuntimeException("Failed to find existing ACL with ID " 
+ record.id() + " in either image or changes");
+        }

Review Comment:
   This comment is for:
   ```java
   public Map<Uuid, Optional<StandardAcl>> changes() {
       return changes;
   }
   ```
   Can we document this method? I am particularly interested on the return 
type. I get the impression that a value of `Optional.empty` should interpreted 
as a delete.



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {
+
+    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+
+    @Test
+    public void testRemovesDeleteIfNotInImage() {
+        AclsImage image = new AclsImage(Collections.emptyMap());
+        AclsDelta delta = new AclsDelta(image);
+        AccessControlEntryRecord inputAclRecord = 
testAccessControlEntryRecord();
+
+        assertEquals(0, delta.changes().size());
+
+        delta.replay(inputAclRecord);
+        assertTrue(delta.changes().containsKey(aclId));
+        assertTrue(delta.changes().get(aclId).isPresent());
+        assertEquals(testStandardAcl(), delta.changes().get(aclId).get());

Review Comment:
   You can remove the `get()` and compare it against 
`Optional.of(testStandardAcl())`.



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {
+
+    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+
+    @Test
+    public void testRemovesDeleteIfNotInImage() {
+        AclsImage image = new AclsImage(Collections.emptyMap());
+        AclsDelta delta = new AclsDelta(image);
+        AccessControlEntryRecord inputAclRecord = 
testAccessControlEntryRecord();
+
+        assertEquals(0, delta.changes().size());
+
+        delta.replay(inputAclRecord);
+        assertTrue(delta.changes().containsKey(aclId));
+        assertTrue(delta.changes().get(aclId).isPresent());
+        assertEquals(testStandardAcl(), delta.changes().get(aclId).get());
+
+        RemoveAccessControlEntryRecord inputRemoveAclRecord = 
testRemoveAccessControlEntryRecord();
+        delta.replay(inputRemoveAclRecord);
+
+        assertFalse(delta.changes().containsKey(aclId));
+    }
+
+    @Test
+    public void testKeepsDeleteIfInImage() {
+        Map<Uuid, StandardAcl> initialImageMap = new HashMap<>();
+        initialImageMap.put(aclId, testStandardAcl());
+        AclsImage image = new AclsImage(initialImageMap);
+        AclsDelta delta = new AclsDelta(image);
+
+        assertEquals(0, delta.changes().size());
+
+        RemoveAccessControlEntryRecord removeAccessControlEntryRecord = 
testRemoveAccessControlEntryRecord();
+        delta.replay(removeAccessControlEntryRecord);
+
+        assertTrue(delta.changes().containsKey(aclId));
+        assertEquals(Optional.empty(), delta.changes().get(aclId));
+    }
+
+    @Test
+    public void testThrowsExceptionOnInvalidState() {

Review Comment:
   Can we also test with an `AclsImage` that has ACLs but for the one being 
removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to