andymg3 commented on code in PR #12160:
URL: https://github.com/apache/kafka/pull/12160#discussion_r876312658


##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {
-        changes.put(record.id(), Optional.empty());
+        if (image.acls().containsKey(record.id())) {
+            changes.put(record.id(), Optional.empty());
+        } else if (changes.containsKey(record.id())) {
+            changes.remove(record.id());

Review Comment:
   Do you mind clarifying? With the exception thrown below I think ACL removal 
is still not idempotent. If an ACL is added, then removed, then removed again, 
then the exception below will be thrown as it would have been removed from the 
Map. 



##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {
-        changes.put(record.id(), Optional.empty());
+        if (image.acls().containsKey(record.id())) {
+            changes.put(record.id(), Optional.empty());
+        } else if (changes.containsKey(record.id())) {
+            changes.remove(record.id());
+        } else {
+            throw new RuntimeException("Failed to find existing ACL with ID " 
+ record.id() + " in either image or changes");
+        }

Review Comment:
   That's correct and that's the existing behavior. I'll add a comment 
clarifying that. 



##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {
-        changes.put(record.id(), Optional.empty());
+        if (image.acls().containsKey(record.id())) {
+            changes.put(record.id(), Optional.empty());
+        } else if (changes.containsKey(record.id())) {
+            changes.remove(record.id());
+        } else {
+            throw new RuntimeException("Failed to find existing ACL with ID " 
+ record.id() + " in either image or changes");

Review Comment:
   I agree that exception is more appropriate. I went with `RuntimeException` 
because thats what we throw in `AclControlManager` 
(https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L202)
 when we come across a removal unexpectedly. My thought process was to be 
consistent. Having said that, they are two different components so I'm fine 
updating it here to `IllegalStateException`. Do you think we should throw the 
same exception in `AclControlManager`?  



##########
metadata/src/main/java/org/apache/kafka/image/AclsDelta.java:
##########
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) {
     }
 
     public void replay(RemoveAccessControlEntryRecord record) {

Review Comment:
   Will add a java doc to this method.



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {
+
+    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+
+    @Test
+    public void testRemovesDeleteIfNotInImage() {
+        AclsImage image = new AclsImage(Collections.emptyMap());
+        AclsDelta delta = new AclsDelta(image);
+        AccessControlEntryRecord inputAclRecord = 
testAccessControlEntryRecord();
+
+        assertEquals(0, delta.changes().size());
+
+        delta.replay(inputAclRecord);
+        assertTrue(delta.changes().containsKey(aclId));
+        assertTrue(delta.changes().get(aclId).isPresent());

Review Comment:
   True but `delta.changes().get(aclId)` could return `null` in which case we'd 
throw a NPE which would be a bit more challenging to debug from the test logs 
in my view so will leave as is unless you have concerns.



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {
+
+    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+
+    @Test
+    public void testRemovesDeleteIfNotInImage() {
+        AclsImage image = new AclsImage(Collections.emptyMap());
+        AclsDelta delta = new AclsDelta(image);
+        AccessControlEntryRecord inputAclRecord = 
testAccessControlEntryRecord();
+
+        assertEquals(0, delta.changes().size());
+
+        delta.replay(inputAclRecord);
+        assertTrue(delta.changes().containsKey(aclId));
+        assertTrue(delta.changes().get(aclId).isPresent());
+        assertEquals(testStandardAcl(), delta.changes().get(aclId).get());
+
+        RemoveAccessControlEntryRecord inputRemoveAclRecord = 
testRemoveAccessControlEntryRecord();
+        delta.replay(inputRemoveAclRecord);
+
+        assertFalse(delta.changes().containsKey(aclId));
+    }
+
+    @Test
+    public void testKeepsDeleteIfInImage() {
+        Map<Uuid, StandardAcl> initialImageMap = new HashMap<>();
+        initialImageMap.put(aclId, testStandardAcl());
+        AclsImage image = new AclsImage(initialImageMap);
+        AclsDelta delta = new AclsDelta(image);
+
+        assertEquals(0, delta.changes().size());
+
+        RemoveAccessControlEntryRecord removeAccessControlEntryRecord = 
testRemoveAccessControlEntryRecord();
+        delta.replay(removeAccessControlEntryRecord);
+
+        assertTrue(delta.changes().containsKey(aclId));
+        assertEquals(Optional.empty(), delta.changes().get(aclId));
+    }
+
+    @Test
+    public void testThrowsExceptionOnInvalidState() {

Review Comment:
   Will add.



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {

Review Comment:
   Hmm yeah that's true. I guess I don't see a big downside of having a 
separate test class as obviously it is different to some degree. So will keep 
the separate test class unless you have concerns.  



##########
metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Timeout(40)
+public class AclsDeltaTest {
+
+    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+
+    @Test
+    public void testRemovesDeleteIfNotInImage() {
+        AclsImage image = new AclsImage(Collections.emptyMap());
+        AclsDelta delta = new AclsDelta(image);
+        AccessControlEntryRecord inputAclRecord = 
testAccessControlEntryRecord();
+
+        assertEquals(0, delta.changes().size());
+
+        delta.replay(inputAclRecord);
+        assertTrue(delta.changes().containsKey(aclId));
+        assertTrue(delta.changes().get(aclId).isPresent());
+        assertEquals(testStandardAcl(), delta.changes().get(aclId).get());

Review Comment:
   Agree that's cleaner and it prevents NPEs if `delta.changes().get(aclId)` 
returns `null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to