Repository: incubator-atlas Updated Branches: refs/heads/master 07b8b4d3c -> 98769871e
ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover. (yhemanth) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/98769871 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/98769871 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/98769871 Branch: refs/heads/master Commit: 98769871e56d9a97792e2dba52345e876908ac63 Parents: 07b8b4d Author: Hemanth Yamijala <[email protected]> Authored: Fri May 13 09:48:20 2016 +0530 Committer: Hemanth Yamijala <[email protected]> Committed: Fri May 13 09:48:20 2016 +0530 ---------------------------------------------------------------------- distro/src/conf/atlas-application.properties | 1 + .../org/apache/atlas/kafka/KafkaConsumer.java | 27 +++- .../apache/atlas/kafka/KafkaNotification.java | 55 +++++--- .../notification/AbstractNotification.java | 9 +- .../AbstractNotificationConsumer.java | 2 + .../notification/NotificationConsumer.java | 9 ++ .../apache/atlas/kafka/KafkaConsumerTest.java | 55 +++++++- .../atlas/kafka/KafkaNotificationTest.java | 119 ++++++---------- .../AbstractNotificationConsumerTest.java | 5 + release-log.txt | 1 + .../main/resources/atlas-application.properties | 1 + .../notification/NotificationHookConsumer.java | 82 ++++++----- .../NotificationHookConsumerKafkaTest.java | 141 +++++++++++++++++++ .../NotificationHookConsumerTest.java | 36 +++++ 14 files changed, 397 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 119865d..b2b62aa 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -59,6 +59,7 @@ atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 atlas.kafka.auto.offset.reset=smallest atlas.kafka.hook.group.id=atlas +atlas.kafka.auto.commit.enable=false ######### Hive Lineage Configs ######### http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java index f1c9742..270215d 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java @@ -19,6 +19,7 @@ package org.apache.atlas.kafka; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.atlas.notification.AbstractNotificationConsumer; import org.apache.atlas.notification.MessageDeserializer; @@ -35,24 +36,29 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { private final int consumerId; private final ConsumerIterator iterator; + private final ConsumerConnector consumerConnector; + private final boolean autoCommitEnabled; + private long lastSeenOffset; // ----- Constructors ---------------------------------------------------- /** * Create a Kafka consumer. - * - * @param type the notification type returned by this consumer * @param deserializer the message deserializer used for this consumer * @param stream the underlying Kafka stream * @param consumerId an id value for this consumer + * @param consumerConnector the {@link ConsumerConnector} which created the underlying Kafka stream + * @param autoCommitEnabled true if consumer does not need to commit offsets explicitly, false otherwise. */ - public KafkaConsumer(Class<T> type, - MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId) { + public KafkaConsumer(MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId, + ConsumerConnector consumerConnector, boolean autoCommitEnabled) { super(deserializer); - + this.consumerConnector = consumerConnector; + this.lastSeenOffset = 0; this.iterator = stream.iterator(); this.consumerId = consumerId; + this.autoCommitEnabled = autoCommitEnabled; } @@ -71,6 +77,7 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { MessageAndMetadata message = iterator.next(); LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}", consumerId, message.topic(), message.partition(), message.offset(), message.message()); + lastSeenOffset = message.offset(); return (String) message.message(); } @@ -79,4 +86,14 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { MessageAndMetadata message = (MessageAndMetadata) iterator.peek(); return (String) message.message(); } + + @Override + public void commit() { + if (autoCommitEnabled) { + LOG.debug("Auto commit is disabled, not committing."); + } else { + consumerConnector.commitOffsets(); + LOG.debug("Committed offset: {}", lastSeenOffset); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index cfffec4..1ee62d2 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.kafka; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Singleton; import kafka.consumer.Consumer; import kafka.consumer.KafkaStream; @@ -112,9 +113,6 @@ public class KafkaNotification extends AbstractNotification implements Service { "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, @@ -123,6 +121,10 @@ public class KafkaNotification extends AbstractNotification implements Service { properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); } + @VisibleForTesting + protected KafkaNotification(Properties properties) { + this.properties = properties; + } // ----- Service --------------------------------------------------------- @@ -159,26 +161,34 @@ public class KafkaNotification extends AbstractNotification implements Service { @Override public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { + return createConsumers(notificationType, numConsumers, + Boolean.valueOf(properties.getProperty("auto.commit.enable", "true"))); + } + + @VisibleForTesting + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, + int numConsumers, boolean autoCommitEnabled) { String topic = TOPIC_MAP.get(notificationType); Properties consumerProperties = getConsumerProperties(notificationType); - ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties); - Map<String, Integer> topicCountMap = new HashMap<>(); - topicCountMap.put(topic, numConsumers); - StringDecoder decoder = new StringDecoder(null); - Map<String, List<KafkaStream<String, String>>> streamsMap = - consumerConnector.createMessageStreams(topicCountMap, decoder, decoder); - List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic); List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers); - int consumerId = 0; - for (KafkaStream stream : kafkaConsumers) { - KafkaConsumer<T> kafkaConsumer = - createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(), - stream, consumerId++); - consumers.add(kafkaConsumer); + for (int i = 0; i < numConsumers; i++) { + ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties); + Map<String, Integer> topicCountMap = new HashMap<>(); + topicCountMap.put(topic, 1); + StringDecoder decoder = new StringDecoder(null); + Map<String, List<KafkaStream<String, String>>> streamsMap = + consumerConnector.createMessageStreams(topicCountMap, decoder, decoder); + List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic); + for (KafkaStream stream : kafkaConsumers) { + KafkaConsumer<T> kafkaConsumer = + createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(), + stream, i, consumerConnector, autoCommitEnabled); + consumers.add(kafkaConsumer); + } + consumerConnectors.add(consumerConnector); } - consumerConnectors.add(consumerConnector); return consumers; } @@ -245,12 +255,14 @@ public class KafkaNotification extends AbstractNotification implements Service { * @param stream the Kafka stream * @param consumerId the id for the new consumer * + * @param consumerConnector * @return a new Kafka consumer */ - protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, - MessageDeserializer<T> deserializer, KafkaStream stream, - int consumerId) { - return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream, consumerId); + protected <T> org.apache.atlas.kafka.KafkaConsumer<T> + createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream, + int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled) { + return new org.apache.atlas.kafka.KafkaConsumer<T>(deserializer, stream, + consumerId, consumerConnector, autoCommitEnabled); } // Get properties for consumer request @@ -266,6 +278,7 @@ public class KafkaNotification extends AbstractNotification implements Service { consumerProperties.putAll(properties); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + LOG.info("Consumer property: auto.commit.enable: " + consumerProperties.getProperty("auto.commit.enable")); return consumerProperties; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 7d22126..cb44fc6 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.notification; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; @@ -46,7 +47,7 @@ public abstract class AbstractNotification implements NotificationInterface { */ public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0"); - private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; + public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; private final boolean embedded; private final boolean isHAEnabled; @@ -59,7 +60,6 @@ public abstract class AbstractNotification implements NotificationInterface { registerTypeAdapter(JSONArray.class, new JSONArraySerializer()). create(); - // ----- Constructors ---------------------------------------------------- public AbstractNotification(Configuration applicationProperties) throws AtlasException { @@ -67,6 +67,11 @@ public abstract class AbstractNotification implements NotificationInterface { this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties); } + @VisibleForTesting + protected AbstractNotification() { + embedded = false; + isHAEnabled = false; + } // ----- NotificationInterface ------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java index f00bbca..d4d78de 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java @@ -68,4 +68,6 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon public T peek() { return deserializer.deserialize(peekMessage()); } + + public abstract void commit(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 78e8ce7..2e861cb 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -43,4 +43,13 @@ public interface NotificationConsumer<T> { * @return the next notification */ T peek(); + + /** + * Commit the offset of messages that have been successfully processed. + * + * This API should be called when messages read with {@link #next()} have been successfully processed and + * the consumer is ready to handle the next message, which could happen even after a normal or an abnormal + * restart. + */ + void commit(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 7f607c6..ad7d93e 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -20,6 +20,7 @@ package org.apache.atlas.kafka; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.MessageVersion; @@ -33,6 +34,9 @@ import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.codehaus.jettison.json.JSONException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; @@ -41,6 +45,8 @@ import java.util.List; import java.util.NoSuchElementException; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.*; @@ -51,6 +57,14 @@ public class KafkaConsumerTest { private static final String TRAIT_NAME = "MyTrait"; + @Mock + private ConsumerConnector consumerConnector; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + @Test public void testNext() throws Exception { KafkaStream<String, String> stream = mock(KafkaStream.class); @@ -70,8 +84,9 @@ public class KafkaConsumerTest { when(messageAndMetadata.message()).thenReturn(json); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = - new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(), - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99); + new KafkaConsumer<>( + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, + consumerConnector, false); assertTrue(consumer.hasNext()); @@ -101,8 +116,9 @@ public class KafkaConsumerTest { when(messageAndMetadata.message()).thenReturn(json); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = - new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(), - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99); + new KafkaConsumer<>( + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, + consumerConnector, false); assertTrue(consumer.hasNext()); @@ -135,8 +151,9 @@ public class KafkaConsumerTest { when(messageAndMetadata.message()).thenReturn(json); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = - new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(), - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99); + new KafkaConsumer<>( + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, + consumerConnector, false); assertTrue(consumer.hasNext()); @@ -147,6 +164,32 @@ public class KafkaConsumerTest { assertTrue(consumer.hasNext()); } + @Test + public void testCommitIsCalledIfAutoCommitDisabled() { + KafkaStream<String, String> stream = mock(KafkaStream.class); + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + new KafkaConsumer<>( + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, + consumerConnector, false); + + consumer.commit(); + + verify(consumerConnector).commitOffsets(); + } + + @Test + public void testCommitIsNotCalledIfAutoCommitEnabled() { + KafkaStream<String, String> stream = mock(KafkaStream.class); + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + new KafkaConsumer<>( + NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, + consumerConnector, true); + + consumer.commit(); + + verify(consumerConnector, never()).commitOffsets(); + } + private Referenceable getEntity(String traitName) { Referenceable entity = EntityNotificationImplTest.getEntity("id"); List<IStruct> traitInfo = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index 17fda25..219bd70 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -17,26 +17,16 @@ */ package org.apache.atlas.kafka; -import com.google.inject.Inject; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; -import org.apache.atlas.AtlasException; import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.NotificationModule; -import org.apache.commons.configuration.Configuration; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Guice; import org.testng.annotations.Test; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -44,99 +34,80 @@ import java.util.Properties; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -@Guice(modules = NotificationModule.class) public class KafkaNotificationTest { - @Inject - private KafkaNotification kafka; - - @BeforeClass - public void setUp() throws Exception { - kafka.start(); - } - @Test @SuppressWarnings("unchecked") public void testCreateConsumers() throws Exception { - Configuration configuration = mock(Configuration.class); - Iterator iterator = mock(Iterator.class); - ConsumerConnector consumerConnector = mock(ConsumerConnector.class); - KafkaStream kafkaStream1 = mock(KafkaStream.class); - KafkaStream kafkaStream2 = mock(KafkaStream.class); - String groupId = "groupId9999"; - - when(configuration.subset(KafkaNotification.PROPERTY_PREFIX)).thenReturn(configuration); - when(configuration.getKeys()).thenReturn(iterator); - when(iterator.hasNext()).thenReturn(true).thenReturn(false); - when(iterator.next()).thenReturn("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY); - when(configuration.getList("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY)) - .thenReturn(Collections.<Object>singletonList(groupId)); - - Map<String, List<KafkaStream<String, String>>> streamsMap = new HashMap<>(); - List<KafkaStream<String, String>> kafkaStreamList = new LinkedList<>(); - kafkaStreamList.add(kafkaStream1); - kafkaStreamList.add(kafkaStream2); - streamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreamList); + Properties properties = mock(Properties.class); + when(properties.getProperty("entities.group.id")).thenReturn("atlas"); + final ConsumerConnector consumerConnector = mock(ConsumerConnector.class); Map<String, Integer> topicCountMap = new HashMap<>(); - topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 2); + topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1); - when(consumerConnector.createMessageStreams( - eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(streamsMap); - - TestKafkaNotification kafkaNotification = new TestKafkaNotification(configuration, consumerConnector); + Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap = + new HashMap<>(); + List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>(); + KafkaStream kafkaStream = mock(KafkaStream.class); + kafkaStreams.add(kafkaStream); + kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams); - List<NotificationConsumer<String>> consumers = - kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2); + when(consumerConnector.createMessageStreams( + eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap); - assertEquals(2, consumers.size()); + final KafkaConsumer consumer1 = mock(KafkaConsumer.class); + final KafkaConsumer consumer2 = mock(KafkaConsumer.class); - // assert that all of the given kafka streams were used to create kafka consumers - List<KafkaStream> streams = kafkaNotification.kafkaStreams; - assertTrue(streams.contains(kafkaStream1)); - assertTrue(streams.contains(kafkaStream2)); + KafkaNotification kafkaNotification = + new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2); - // assert that the given consumer group id was added to the properties used to create the consumer connector - Properties properties = kafkaNotification.myProperties; - assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); - } + List<NotificationConsumer<String>> consumers = + kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2); - @AfterClass - public void teardown() throws Exception { - kafka.stop(); + verify(consumerConnector, times(2)).createMessageStreams( + eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class)); + assertEquals(consumers.size(), 2); + assertTrue(consumers.contains(consumer1)); + assertTrue(consumers.contains(consumer2)); } - // Extended kafka notification class for testing. - private static class TestKafkaNotification extends KafkaNotification { + class TestKafkaNotification extends KafkaNotification { private final ConsumerConnector consumerConnector; + private final KafkaConsumer consumer1; + private final KafkaConsumer consumer2; - private Properties myProperties; - private List<KafkaStream> kafkaStreams = new LinkedList<>(); - - public TestKafkaNotification(Configuration applicationProperties, - ConsumerConnector consumerConnector) throws AtlasException { - super(applicationProperties); + TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector, + KafkaConsumer consumer1, KafkaConsumer consumer2) { + super(properties); this.consumerConnector = consumerConnector; + this.consumer1 = consumer1; + this.consumer2 = consumer2; } @Override protected ConsumerConnector createConsumerConnector(Properties consumerProperties) { - this.myProperties = consumerProperties; - kafkaStreams.clear(); return consumerConnector; } @Override - protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, - MessageDeserializer<T> deserializer, - KafkaStream stream, - int consumerId) { - kafkaStreams.add(stream); - return super.createKafkaConsumer(type, deserializer, stream, consumerId); + protected <T> org.apache.atlas.kafka.KafkaConsumer<T> + createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream, + int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) { + if (consumerId == 0) { + return consumer1; + } else if (consumerId == 1) { + return consumer2; + } + return null; } + + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index e63175d..e8b55ef 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -253,6 +253,11 @@ public class AbstractNotificationConsumerTest { public boolean hasNext() { return index < messageList.size(); } + + @Override + public void commit() { + // do nothing. + } } private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f68e86c..5d37d07 100644 --- a/release-log.txt +++ b/release-log.txt @@ -20,6 +20,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover. (yhemanth) ATLAS-758 hdfs location of hive table is pointing to old location even after rename ( sumasai ) ATLAS-667 Entity delete should check for required reverse references ( dkantor via sumasai ) ATLAS-738 Add query ability on system properties like guid, state, createdtime etc (shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/typesystem/src/main/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties index a343a20..aafad0f 100644 --- a/typesystem/src/main/resources/atlas-application.properties +++ b/typesystem/src/main/resources/atlas-application.properties @@ -70,6 +70,7 @@ atlas.kafka.consumer.timeout.ms=100 atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.hook.group.id=atlas atlas.kafka.entities.group.id=atlas_entities +atlas.kafka.auto.commit.enable=false ######### Entity Audit Configs ######### atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 8ef2f64..901b1ed 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.notification; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -183,50 +184,55 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl while (shouldRun.get()) { try { if (hasNext()) { - HookNotification.HookNotificationMessage message = consumer.next(); - atlasClient.setUser(message.getUser()); - try { - switch (message.getType()) { - case ENTITY_CREATE: - HookNotification.EntityCreateRequest createRequest = - (HookNotification.EntityCreateRequest) message; - atlasClient.createEntity(createRequest.getEntities()); - break; + handleMessage(consumer.next()); + } + } catch (Throwable t) { + LOG.warn("Failure in NotificationHookConsumer", t); + } + } + } - case ENTITY_PARTIAL_UPDATE: - HookNotification.EntityPartialUpdateRequest partialUpdateRequest = - (HookNotification.EntityPartialUpdateRequest) message; - atlasClient.updateEntity(partialUpdateRequest.getTypeName(), - partialUpdateRequest.getAttribute(), - partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity()); - break; + @VisibleForTesting + void handleMessage(HookNotification.HookNotificationMessage message) { + atlasClient.setUser(message.getUser()); + try { + switch (message.getType()) { + case ENTITY_CREATE: + HookNotification.EntityCreateRequest createRequest = + (HookNotification.EntityCreateRequest) message; + atlasClient.createEntity(createRequest.getEntities()); + break; - case ENTITY_DELETE: - HookNotification.EntityDeleteRequest deleteRequest = - (HookNotification.EntityDeleteRequest) message; - atlasClient.deleteEntity(deleteRequest.getTypeName(), - deleteRequest.getAttribute(), - deleteRequest.getAttributeValue()); - break; + case ENTITY_PARTIAL_UPDATE: + HookNotification.EntityPartialUpdateRequest partialUpdateRequest = + (HookNotification.EntityPartialUpdateRequest) message; + atlasClient.updateEntity(partialUpdateRequest.getTypeName(), + partialUpdateRequest.getAttribute(), + partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity()); + break; - case ENTITY_FULL_UPDATE: - HookNotification.EntityUpdateRequest updateRequest = - (HookNotification.EntityUpdateRequest) message; - atlasClient.updateEntities(updateRequest.getEntities()); - break; + case ENTITY_DELETE: + HookNotification.EntityDeleteRequest deleteRequest = + (HookNotification.EntityDeleteRequest) message; + atlasClient.deleteEntity(deleteRequest.getTypeName(), + deleteRequest.getAttribute(), + deleteRequest.getAttributeValue()); + break; - default: - throw new IllegalStateException("Unhandled exception!"); - } - } catch (Exception e) { - //todo handle failures - LOG.warn("Error handling message {}", message, e); - } - } - } catch (Throwable t) { - LOG.warn("Failure in NotificationHookConsumer", t); + case ENTITY_FULL_UPDATE: + HookNotification.EntityUpdateRequest updateRequest = + (HookNotification.EntityUpdateRequest) message; + atlasClient.updateEntities(updateRequest.getEntities()); + break; + + default: + throw new IllegalStateException("Unhandled exception!"); } + } catch (Exception e) { + //todo handle failures + LOG.warn("Error handling message {}", message, e); } + consumer.commit(); } boolean serverAvailable(Timer timer) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java new file mode 100644 index 0000000..5b2ffeb --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.inject.Inject; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.LocalAtlasClient; +import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.lang.RandomStringUtils; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@Guice(modules = NotificationModule.class) +public class NotificationHookConsumerKafkaTest { + + @Inject + private NotificationInterface notificationInterface; + + private KafkaNotification kafkaNotification; + + @BeforeTest + public void setup() throws AtlasException { + kafkaNotification = startKafkaServer(); + } + + @AfterTest + public void shutdown() { + kafkaNotification.stop(); + } + + @Test + public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { + + produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + createNewConsumer(kafkaNotification, false); + LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); + NotificationHookConsumer notificationHookConsumer = + new NotificationHookConsumer(kafkaNotification, localAtlasClient); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(consumer); + + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user1"); + + + // produce another message, and make sure it moves ahead. If commit succeeded, this would work. + produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user2"); + + kafkaNotification.close(); + } + + @Test + public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() + throws NotificationException, InterruptedException { + + produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + createNewConsumer(kafkaNotification, true); + LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); + NotificationHookConsumer notificationHookConsumer = + new NotificationHookConsumer(kafkaNotification, localAtlasClient); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(consumer); + + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user3"); + + // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. + produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); + + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user3"); + + kafkaNotification.close(); + } + + NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer( + KafkaNotification kafkaNotification, boolean autoCommitEnabled) { + return kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers( + NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); + } + + void consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage> consumer, + NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { + while (!consumer.hasNext()) { + Thread.sleep(1000); + } + hookConsumer.handleMessage(consumer.next()); + } + + Referenceable createEntity() { + final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE); + entity.set("name", "db" + randomString()); + entity.set("description", randomString()); + return entity; + } + + KafkaNotification startKafkaServer() throws AtlasException { + KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface; + kafkaNotification.start(); + return kafkaNotification; + } + + protected String randomString() { + return RandomStringUtils.randomAlphanumeric(10); + } + + private void produceMessage(HookNotification.HookNotificationMessage message) throws NotificationException { + notificationInterface.send(NotificationInterface.NotificationType.HOOK, message); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 8765826..7860eb6 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -21,6 +21,7 @@ import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.commons.configuration.Configuration; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -87,6 +89,40 @@ public class NotificationHookConsumerTest { } @Test + public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException { + NotificationHookConsumer notificationHookConsumer = + new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationConsumer consumer = mock(NotificationConsumer.class); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(consumer); + HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); + when(message.getUser()).thenReturn("user"); + when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); + + hookConsumer.handleMessage(message); + + verify(consumer).commit(); + } + + @Test + public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException { + NotificationHookConsumer notificationHookConsumer = + new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationConsumer consumer = mock(NotificationConsumer.class); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(consumer); + HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); + when(message.getUser()).thenReturn("user"); + when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); + when(atlasClient.createEntity(any(List.class))). + thenThrow(new RuntimeException("Simulating exception in processing message")); + + hookConsumer.handleMessage(message); + + verify(consumer).commit(); + } + + @Test public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer.HookConsumer hookConsumer =
