[ 
https://issues.apache.org/jira/browse/HDDS-1105?focusedWorklogId=293303&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293303
 ]

ASF GitHub Bot logged work on HDDS-1105:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Aug/19 18:29
            Start Date: 12/Aug/19 18:29
    Worklog Time Spent: 10m 
      Work Description: swagle commented on pull request #1259: HDDS-1105 : Add 
mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager
URL: https://github.com/apache/hadoop/pull/1259#discussion_r313065850
 
 

 ##########
 File path: 
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
 ##########
 @@ -158,4 +165,162 @@ public void testGetOzoneManagerDBSnapshot() throws 
Exception {
     assertTrue(checkpoint.getCheckpointLocation().toFile()
         .listFiles().length == 2);
   }
+
+  @Test
+  public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
+
+    // Writing 2 Keys into a source OM DB and collecting it in a
+    // DBUpdatesWrapper.
+    OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager();
+    writeDataToOm(sourceOMMetadataMgr, "key_one");
+    writeDataToOm(sourceOMMetadataMgr, "key_two");
+
+    RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
+    TransactionLogIterator transactionLogIterator = 
rocksDB.getUpdatesSince(0L);
+    DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+    while(transactionLogIterator.isValid()) {
+      TransactionLogIterator.BatchResult result =
+          transactionLogIterator.getBatch();
+      result.writeBatch().markWalTerminationPoint();
+      WriteBatch writeBatch = result.writeBatch();
+      dbUpdatesWrapper.addWriteBatch(writeBatch.data(),
+          result.sequenceNumber());
+      transactionLogIterator.next();
+    }
+
+    // OM Service Provider's Metadata Manager.
+    OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
+
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration,
+            getTestMetadataManager(omMetadataManager),
+            getMockTaskController(), new ReconUtils());
+    OzoneManagerProtocol ozoneManagerProtocolMock =
+        mock(OzoneManagerProtocol.class);
+    when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+        .DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper);
+    injectOzoneServiceProviderField(ozoneManagerServiceProvider,
+        ozoneManagerProtocolMock, "ozoneManagerClient");
+
+    OMDBUpdatesHandler updatesHandler =
+        new OMDBUpdatesHandler(omMetadataManager);
+    ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
+        0L, updatesHandler);
+
+    // In this method, we have to assert the "GET" part and the "APPLY" path.
+
+    // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
+    // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
+    assertEquals(4, updatesHandler.getEvents().size());
+
+    // Assert APPLY path --> Verify if the OM service provider's RocksDB got
+    // the changes.
+    String fullKey = omMetadataManager.getOzoneKey("sampleVol",
+        "bucketOne", "key_one");
+    assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+        .getKeyTable().isExist(fullKey));
+    fullKey = omMetadataManager.getOzoneKey("sampleVol",
+        "bucketOne", "key_two");
+    assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+        .getKeyTable().isExist(fullKey));
+  }
+
+  @Test
+  public void testSyncDataFromOMFullSnapshot() throws Exception {
+
+    // Empty OM DB to start with.
+    ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+        initializeEmptyOmMetadataManager());
+    ReconTaskStatusDao reconTaskStatusDaoMock =
+        mock(ReconTaskStatusDao.class);
+    doNothing().when(reconTaskStatusDaoMock)
+        .update(any(ReconTaskStatus.class));
+
+    ReconTaskController reconTaskControllerMock = getMockTaskController();
+    when(reconTaskControllerMock.getReconTaskStatusDao())
+        .thenReturn(reconTaskStatusDaoMock);
+    doNothing().when(reconTaskControllerMock)
+        .reInitializeTasks(omMetadataManager);
+
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+            reconTaskControllerMock, new ReconUtils());
+
+    //Should trigger full snapshot request.
+    ozoneManagerServiceProvider.syncDataFromOM();
+
+    ArgumentCaptor<ReconTaskStatus> captor =
+        ArgumentCaptor.forClass(ReconTaskStatus.class);
+    verify(reconTaskStatusDaoMock, times(1))
+        .update(captor.capture());
+    assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT"));
+    verify(reconTaskControllerMock, times(1))
+        .reInitializeTasks(omMetadataManager);
+  }
+
+  @Test
+  public void testSyncDataFromOMDeltaUpdates() throws Exception {
+
+    // Non-Empty OM DB to start with.
+    ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+        initializeNewOmMetadataManager());
+    ReconTaskStatusDao reconTaskStatusDaoMock =
+        mock(ReconTaskStatusDao.class);
+    doNothing().when(reconTaskStatusDaoMock)
+        .update(any(ReconTaskStatus.class));
+
+    ReconTaskController reconTaskControllerMock = getMockTaskController();
+    when(reconTaskControllerMock.getReconTaskStatusDao())
+        .thenReturn(reconTaskStatusDaoMock);
+    doNothing().when(reconTaskControllerMock)
+        .consumeOMEvents(any(OMUpdateEventBatch.class),
+            any(OMMetadataManager.class));
+
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+            reconTaskControllerMock, new ReconUtils());
+    OzoneManagerProtocol ozoneManagerProtocolMock =
+        mock(OzoneManagerProtocol.class);
+    when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+        .DBUpdatesRequest.class))).thenReturn(new DBUpdatesWrapper());
+    injectOzoneServiceProviderField(ozoneManagerServiceProvider,
+        ozoneManagerProtocolMock, "ozoneManagerClient");
+
+    // Should trigger delta updates.
+    ozoneManagerServiceProvider.syncDataFromOM();
+
+    ArgumentCaptor<ReconTaskStatus> captor =
+        ArgumentCaptor.forClass(ReconTaskStatus.class);
+    verify(reconTaskStatusDaoMock, times(1))
+        .update(captor.capture());
+    assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES"));
+
+    verify(reconTaskControllerMock, times(1))
+        .consumeOMEvents(any(OMUpdateEventBatch.class),
+            any(OMMetadataManager.class));
+  }
+
+  private ReconTaskController getMockTaskController() {
+    ReconTaskController reconTaskControllerMock =
+        mock(ReconTaskController.class);
+    return reconTaskControllerMock;
+  }
+
+  private ReconUtils getMockReconUtils() throws IOException {
+    ReconUtils reconUtilsMock = mock(ReconUtils.class);
+    when(reconUtilsMock.getReconDbDir(any(), 
anyString())).thenCallRealMethod();
+    doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any());
+    return reconUtilsMock;
+  }
+
+  private void injectOzoneServiceProviderField(
+      OzoneManagerServiceProviderImpl ozoneManagerServiceProvider,
+      Object fieldValue, String fieldName)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f1 = ozoneManagerServiceProvider.getClass()
 
 Review comment:
   Let's use the @Provides method for some stateful instance creation instead 
of ugly refection code :-)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 293303)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone 
> Manager.
> --------------------------------------------------------------------------------
>
>                 Key: HDDS-1105
>                 URL: https://issues.apache.org/jira/browse/HDDS-1105
>             Project: Hadoop Distributed Data Store
>          Issue Type: Sub-task
>            Reporter: Aravindan Vijayan
>            Assignee: Aravindan Vijayan
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> *Some context*
> The FSCK server will periodically invoke this OM API passing in the most 
> recent sequence number of its own RocksDB instance. The OM will use the 
> RockDB getUpdateSince() API to answer this query. Since the getUpdateSince 
> API only works against the RocksDB WAL, we have to configure OM RocksDB WAL 
> (https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log) with sufficient 
> max size to make this API useful. If the OM cannot get all transactions since 
> the given sequence number (due to WAL flushing), it can error out. In that 
> case the FSCK server can fall back to getting the entire checkpoint snapshot 
> implemented in HDDS-1085.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to