METRON-1394 Create Rest endpoint to add the ACL for current user to kafka topics (MohanDV via merrimanr) closes apache/metron#895
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e265b369 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e265b369 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e265b369 Branch: refs/heads/feature/METRON-1416-upgrade-solr Commit: e265b369192613335335514bc842f6aebe016d3e Parents: 5f08ba0 Author: MohanDV <mohan...@gmail.com> Authored: Wed Feb 21 08:32:01 2018 -0600 Committer: merrimanr <merrim...@apache.org> Committed: Wed Feb 21 08:32:01 2018 -0600 ---------------------------------------------------------------------- .../metron/rest/controller/KafkaController.java | 1 + .../metron/rest/service/KafkaService.java | 9 ++++++ .../rest/service/impl/KafkaServiceImpl.java | 33 ++++++++++++++++++-- .../rest/service/impl/KafkaServiceImplTest.java | 10 ++++++ 4 files changed, 51 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java index d057ac4..d04e227 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java @@ -119,4 +119,5 @@ public class KafkaController { kafkaService.produceMessage(name, message); return new ResponseEntity<>(HttpStatus.OK); } + } http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java index da3b226..aa35c5d 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java @@ -68,4 +68,13 @@ public interface KafkaService { String getSampleMessage(String topic); void produceMessage(String topic, String message) throws RestException; + + + /** + * + * @param name The name of the Kafka topic to add the ACL. + * @return If topic was present true; otherwise false. + */ + boolean addACLToCurrentUser(String name); + } http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java index 4f232fb..ac001b5 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java @@ -17,11 +17,13 @@ */ package org.apache.metron.rest.service.impl; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.HashSet; +import java.util.Map; import java.util.stream.Collectors; +import kafka.admin.AclCommand; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; @@ -33,12 +35,15 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.KafkaTopic; import org.apache.metron.rest.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.security.core.userdetails.User; import org.springframework.stereotype.Service; /** @@ -83,6 +88,9 @@ public class KafkaServiceImpl implements KafkaService { if (!listTopics().contains(topic.getName())) { try { adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$); + if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)){ + addACLToCurrentUser(topic.getName()); + } } catch (AdminOperationException e) { throw new RestException(e); } @@ -154,4 +162,25 @@ public class KafkaServiceImpl implements KafkaService { public void produceMessage(String topic, String message) throws RestException { kafkaProducer.send(new ProducerRecord<>(topic, message)); } + + @Override + public boolean addACLToCurrentUser(String name){ + if(listTopics().contains(name)) { + String zkServers = environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY); + User principal = (User) SecurityContextHolder.getContext().getAuthentication().getPrincipal(); + String user = principal.getUsername(); + List<String> cmd = new ArrayList<>(); + cmd.add("--add"); + cmd.add("--allow-principal"); + cmd.add("User:" + user); + cmd.add("--topic"); + cmd.add(name); + cmd.add("--authorizer-properties"); + cmd.add("zookeeper.connect=" + String.join(",", zkServers)); + AclCommand.main(cmd.toArray(new String[cmd.size()])); + } else { + return false; + } + return true; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/e265b369/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java index 4527e8e..b99128a 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java @@ -63,6 +63,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.kafka.core.ConsumerFactory; + @SuppressWarnings("unchecked") @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") // resolve classloader conflict @@ -79,6 +80,7 @@ public class KafkaServiceImplTest { private KafkaService kafkaService; + private static final KafkaTopic VALID_KAFKA_TOPIC = new KafkaTopic() {{ setReplicationFactor(2); setNumPartitions(1); @@ -314,4 +316,12 @@ public class KafkaServiceImplTest { verify(kafkaProducer).send(new ProducerRecord<>(topicName, expectedMessage)); verifyZeroInteractions(kafkaProducer); } + + @Test + public void addACLtoNonExistingTopicShouldReturnFalse() throws Exception{ + when(kafkaConsumer.listTopics()).thenReturn(Maps.newHashMap()); + assertFalse(kafkaService.addACLToCurrentUser("non_existent_topic")); + } + + }