METRON-1394 Create Rest endpoint to add the ACL for current user to kafka 
topics  (MohanDV via merrimanr) closes apache/metron#895


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e265b369
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e265b369
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e265b369

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: e265b369192613335335514bc842f6aebe016d3e
Parents: 5f08ba0
Author: MohanDV <mohan...@gmail.com>
Authored: Wed Feb 21 08:32:01 2018 -0600
Committer: merrimanr <merrim...@apache.org>
Committed: Wed Feb 21 08:32:01 2018 -0600

----------------------------------------------------------------------
 .../metron/rest/controller/KafkaController.java |  1 +
 .../metron/rest/service/KafkaService.java       |  9 ++++++
 .../rest/service/impl/KafkaServiceImpl.java     | 33 ++++++++++++++++++--
 .../rest/service/impl/KafkaServiceImplTest.java | 10 ++++++
 4 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
index d057ac4..d04e227 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
@@ -119,4 +119,5 @@ public class KafkaController {
     kafkaService.produceMessage(name, message);
     return new ResponseEntity<>(HttpStatus.OK);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
index da3b226..aa35c5d 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
@@ -68,4 +68,13 @@ public interface KafkaService {
   String getSampleMessage(String topic);
 
   void produceMessage(String topic, String message) throws RestException;
+
+
+  /**
+   *
+   * @param name The name of the Kafka topic to add the ACL.
+   * @return If topic was present true; otherwise false.
+   */
+  boolean addACLToCurrentUser(String name);
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 4f232fb..ac001b5 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -17,11 +17,13 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.stream.Collectors;
+import kafka.admin.AclCommand;
 import kafka.admin.AdminOperationException;
 import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
@@ -33,12 +35,15 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.KafkaTopic;
 import org.apache.metron.rest.service.KafkaService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.User;
 import org.springframework.stereotype.Service;
 
 /**
@@ -83,6 +88,9 @@ public class KafkaServiceImpl implements KafkaService {
     if (!listTopics().contains(topic.getName())) {
       try {
         adminUtils.createTopic(zkUtils, topic.getName(), 
topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), 
RackAwareMode.Disabled$.MODULE$);
+        if 
(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, 
Boolean.class, false)){
+          addACLToCurrentUser(topic.getName());
+        }
       } catch (AdminOperationException e) {
         throw new RestException(e);
       }
@@ -154,4 +162,25 @@ public class KafkaServiceImpl implements KafkaService {
   public void produceMessage(String topic, String message) throws 
RestException {
     kafkaProducer.send(new ProducerRecord<>(topic, message));
   }
+
+  @Override
+  public boolean addACLToCurrentUser(String name){
+    if(listTopics().contains(name)) {
+      String zkServers = 
environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY);
+      User principal = (User) 
SecurityContextHolder.getContext().getAuthentication().getPrincipal();
+      String user = principal.getUsername();
+      List<String> cmd = new ArrayList<>();
+      cmd.add("--add");
+      cmd.add("--allow-principal");
+      cmd.add("User:" + user);
+      cmd.add("--topic");
+      cmd.add(name);
+      cmd.add("--authorizer-properties");
+      cmd.add("zookeeper.connect=" + String.join(",", zkServers));
+      AclCommand.main(cmd.toArray(new String[cmd.size()]));
+    } else {
+      return false;
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
index 4527e8e..b99128a 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -63,6 +63,7 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.kafka.core.ConsumerFactory;
 
+
 @SuppressWarnings("unchecked")
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*") // resolve classloader conflict
@@ -79,6 +80,7 @@ public class KafkaServiceImplTest {
 
   private KafkaService kafkaService;
 
+
   private static final KafkaTopic VALID_KAFKA_TOPIC = new KafkaTopic() {{
     setReplicationFactor(2);
     setNumPartitions(1);
@@ -314,4 +316,12 @@ public class KafkaServiceImplTest {
     verify(kafkaProducer).send(new ProducerRecord<>(topicName, 
expectedMessage));
     verifyZeroInteractions(kafkaProducer);
   }
+
+  @Test
+  public void addACLtoNonExistingTopicShouldReturnFalse() throws Exception{
+    when(kafkaConsumer.listTopics()).thenReturn(Maps.newHashMap());
+    assertFalse(kafkaService.addACLToCurrentUser("non_existent_topic"));
+  }
+
+
 }

Reply via email to