> On May 18, 2021, 4:34 a.m., Sarath Subramanian wrote: > > webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java > > Lines 76 (patched) > > <https://reviews.apache.org/r/73329/diff/6/?file=2250100#file2250100line76> > > > > we don't need to reverse iterate the list, since we are iterating the > > entire list: > > > > consider maintaining a minValue - the timestamp in cache should be the > > closest to the spooled timestamp: consider using the below method for > > better readability > > > > ``` > > public String getGuidForDeletedEntity(String qualifiedName, long > > spooledMsgTimestamp) { > > if (!this.entitiesDeletedByDelete.containsKey(qualifiedName) || > > spooledMsgTimestamp <= 0) { > > return null; > > } > > > > String ret = null; > > List<TypesUtil.Pair<Long, String>> timestampGuidPairs = > > this.entitiesDeletedByDelete.get(qualifiedName); > > long minTimestamp = > > Long.MAX_VALUE; > > > > for (TypesUtil.Pair<Long, String> tsGuidPair : > > timestampGuidPairs) { > > String entityGuid = tsGuidPair.right; > > long entityDeleteTimestamp = tsGuidPair.left; > > long timestampDifference = > > Math.abs(entityDeleteTimestamp - spooledMsgTimestamp); > > > > if (timestampDifference < minTimestamp) { > > minTimestamp = timestampDifference; > > ret = entityGuid; > > } > > } > > > > return ret; > > } > > ```
I modified the logic and added unit test to cover the logic. It is better than before and does not use Math.abs. - Ashutosh ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/73329/#review223002 ----------------------------------------------------------- On May 18, 2021, 5:39 a.m., Ashutosh Mestry wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/73329/ > ----------------------------------------------------------- > > (Updated May 18, 2021, 5:39 a.m.) > > > Review request for atlas, Radhika Kundam and Sarath Subramanian. > > > Bugs: ATLAS-4152 > https://issues.apache.org/jira/browse/ATLAS-4152 > > > Repository: atlas > > > Description > ------- > > **Background** > As part of ATLAS-4204, HS2 notifications send entity-lineage only (provided > the poperty is enabled). > > When Spooling enabled the order of messages can potentially change. The > notification messages coming from HS2 and HMS may not be in the same order as > when they arrived with direct notification. > > Problem: > Consider the sequence of arriving messages: > > This is the sequence of messages for Entity 1 (C = create, U = update, D = > delete, L?x = Lineage of type 'x') > No problem: C1, U1, L1x, L1y, D1 > Problem: C1, U1, D1, L1x, L1y > > This implementation attempts to handle ths problem mentioned above. If the > above case is not handled, it will end up creating shell entities, since > deleted entities are not looked up as part of entity creation. > > **Approach** > Used bounded stream approach where an incoming stream of messages is bounded > with an indicator that it originates from spool. This helps makes localized > decisions on the incoming stream of messages. > > High-level approach: > - Messages when written to the spool are tagged with a timestamp. > - Deleted entities are maintained in a cache. > - Lineage-only message are checked if they refer to a deleted entity. > - If they refer to deleted entity, they are stitched to the one present in > the cache only if it falls within the threshold. > - Using step-climbing approach for locating right entity to stitch lineage to. > > New: _EntityCorrelationsManager_: Uses message timestamp and cached entity > qualifiedName-GUID map. > Modifed: _NotificationHookConsumer_ Uses the new class. > New: _HiveDDLLineagePreprocess_ Uses entity-correlation to link to deleted > entities. > Modified: _SpoolConfiguration_: Added new configuration to pause message > sending after destination is available: > _atlas.hook.spool.pause.before.send.sec_. > > > Diffs > ----- > > > intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java > 810ba97c9 > notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 9162ac144 > notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java > f7d9668ec > notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java > 22bd79fdf > notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java > 3d1b3ccf1 > > notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java > 3264e264c > > notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java > edd8ed931 > > notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java > 2d7d19595 > > notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java > 22242c933 > > notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java > a9a3a78cc > notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java > 2cacaaadc > > notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java > d7e4959f7 > > notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java > 167efbecc > > webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java > PRE-CREATION > > webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java > 84cc8d813 > > webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java > 89568e236 > > webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java > PRE-CREATION > > webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java > 86e3384ee > > webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java > PRE-CREATION > > webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java > 608b4a304 > > webapp/src/test/java/org/apache/atlas/notification/EntityCorrelationManagerTest.java > PRE-CREATION > > > Diff: https://reviews.apache.org/r/73329/diff/7/ > > > Testing > ------- > > **Functional tests** > Manual verification of scenarios. > > **Test data** > 11:55: Kafka Down! > 12:00: create table t01(c01 string); > 12:10: create view t06_vw as select * from t01; > 12:20: create view t06_vw_1 as select * from t01; > 12:30: create view t06_vw_2 as select * from t01; > 12:40: create view t06_vw_3 as select * from t01; > 12:50: DROP TABLE t01; > 12:52: create table t01(c01 string); > 12:53: create view t06_vwx as select * from t01; > 12:54: create view t06_vwx_1 as select * from t01; > 12:55: create view t06_vwx_2 as select * from t01; > 12:56: create view t06_vwx_3 as select * from t01; > 12:57: DROP TABLE T01; > 12:58: create table t01(c01 string); > 01:00: Kafka UP! > > > Thanks, > > Ashutosh Mestry > >