YARN-5432. Lock already held by another process while LevelDB cache store 
creation for dag. Contributed by Li Lu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f3c306e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f3c306e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f3c306e

Branch: refs/heads/HADOOP-12756
Commit: 7f3c306e2ed9e865171324898decb9b587d90e73
Parents: 414fbfa
Author: Junping Du <junping...@apache.org>
Authored: Thu Jul 28 06:35:24 2016 -0700
Committer: Junping Du <junping...@apache.org>
Committed: Thu Jul 28 06:35:24 2016 -0700

----------------------------------------------------------------------
 .../src/main/resources/yarn-default.xml         |  11 ++
 .../yarn/server/timeline/EntityCacheItem.java   |  46 --------
 .../timeline/EntityGroupFSTimelineStore.java    |  26 +----
 .../TestEntityGroupFSTimelineStore.java         | 113 +------------------
 4 files changed, 15 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f3c306e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index ed220c0..c8bc741 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2168,6 +2168,17 @@
   </property>
 
   <property>
+    <name>yarn.timeline-service.entity-group-fs-store.app-cache-size</name>
+    <description>
+      Size of the reader cache for ATS v1.5 reader. This value controls how 
many
+      entity groups the ATS v1.5 server should cache. If the number of active
+      read entity groups is greater than the number of caches items, some reads
+      may return empty data. This value must be greater than 0.
+    </description>
+    <value>10</value>
+  </property>
+
+  <property>
     <name>yarn.timeline-service.client.fd-flush-interval-secs</name>
     <description>
       Flush interval for ATS v1.5 writer. This value controls how frequent

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f3c306e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
index 2b6e023..7ed7c4a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -16,8 +16,6 @@
  */
 package org.apache.hadoop.yarn.server.timeline;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@@ -26,7 +24,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Cache item for timeline server v1.5 reader cache. Each cache item has a
@@ -41,8 +38,6 @@ public class EntityCacheItem {
   private EntityGroupFSTimelineStore.AppLogs appLogs;
   private long lastRefresh;
   private Configuration config;
-  private int refCount = 0;
-  private static AtomicInteger activeStores = new AtomicInteger(0);
 
   public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
     this.groupId = gId;
@@ -76,13 +71,6 @@ public class EntityCacheItem {
   }
 
   /**
-   * @return The number of currently active stores in all CacheItems.
-   */
-  public static int getActiveStores() {
-    return activeStores.get();
-  }
-
-  /**
    * Refresh this cache item if it needs refresh. This will enforce an appLogs
    * rescan and then load new data. The refresh process is synchronized with
    * other operations on the same cache item.
@@ -107,7 +95,6 @@ public class EntityCacheItem {
       }
       if (!appLogs.getDetailLogs().isEmpty()) {
         if (store == null) {
-          activeStores.getAndIncrement();
           store = new LevelDBCacheTimelineStore(groupId.toString(),
               "LeveldbCache." + groupId);
           store.init(config);
@@ -134,31 +121,6 @@ public class EntityCacheItem {
   }
 
   /**
-   * Increase the number of references to this cache item by 1.
-   */
-  public synchronized void incrRefs() {
-    refCount++;
-  }
-
-  /**
-   * Unregister a reader. Try to release the cache if the reader to current
-   * cache reaches 0.
-   *
-   * @return true if the cache has been released, otherwise false
-   */
-  public synchronized boolean tryRelease() {
-    refCount--;
-    // Only reclaim the storage if there is no reader.
-    if (refCount > 0) {
-      LOG.debug("{} references left for cached group {}, skipping the release",
-          refCount, groupId);
-      return false;
-    }
-    forceRelease();
-    return true;
-  }
-
-  /**
    * Force releasing the cache item for the given group id, even though there
    * may be active references.
    */
@@ -171,8 +133,6 @@ public class EntityCacheItem {
       LOG.warn("Error closing timeline store", e);
     }
     store = null;
-    activeStores.getAndDecrement();
-    refCount = 0;
     // reset offsets so next time logs are re-parsed
     for (LogInfo log : appLogs.getDetailLogs()) {
       if (log.getFilename().contains(groupId.toString())) {
@@ -182,12 +142,6 @@ public class EntityCacheItem {
     LOG.debug("Cache for group {} released. ", groupId);
   }
 
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  synchronized int getRefCount() {
-    return refCount;
-  }
-
   private boolean needRefresh() {
     return (Time.monotonicNow() - lastRefresh > 10000);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f3c306e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index 9593a31..a9d3c1d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -181,15 +181,8 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
               TimelineEntityGroupId groupId = eldest.getKey();
               LOG.debug("Evicting {} due to space limitations", groupId);
               EntityCacheItem cacheItem = eldest.getValue();
-              int activeStores = EntityCacheItem.getActiveStores();
-              if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) 
{
-                LOG.debug("Force release cache {} since {} stores are active",
-                    groupId, activeStores);
-                cacheItem.forceRelease();
-              } else {
-                LOG.debug("Try release cache {}", groupId);
-                cacheItem.tryRelease();
-              }
+              LOG.debug("Force release cache {}.", groupId);
+              cacheItem.forceRelease();
               if (cacheItem.getAppLogs().isDone()) {
                 appIdLogMap.remove(groupId.getApplicationId());
               }
@@ -920,7 +913,6 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
   @InterfaceAudience.Private
   @VisibleForTesting
   void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) 
{
-    cacheItem.incrRefs();
     cachedLogs.put(groupId, cacheItem);
   }
 
@@ -1003,8 +995,6 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
           LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
           cacheItem.setAppLogs(appLogs);
           this.cachedLogs.put(groupId, cacheItem);
-          // Add the reference by the cache
-          cacheItem.incrRefs();
         } else {
           LOG.warn("AppLogs for groupId {} is set to null!", groupId);
         }
@@ -1014,8 +1004,6 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
     if (cacheItem.getAppLogs() != null) {
       AppLogs appLogs = cacheItem.getAppLogs();
       LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
-      // Add the reference by the store
-      cacheItem.incrRefs();
       cacheItems.add(cacheItem);
       store = cacheItem.refreshCache(aclManager, metrics);
     } else {
@@ -1024,12 +1012,6 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
     return store;
   }
 
-  protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) 
{
-    for (EntityCacheItem item : relatedCacheItems) {
-      item.tryRelease();
-    }
-  }
-
   @Override
   public TimelineEntities getEntities(String entityType, Long limit,
       Long windowStart, Long windowEnd, String fromId, Long fromTs,
@@ -1049,7 +1031,6 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
         returnEntities.addEntities(entities.getEntities());
       }
     }
-    tryReleaseCacheItems(relatedCacheItems);
     return returnEntities;
   }
 
@@ -1066,12 +1047,10 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
       TimelineEntity e =
           store.getEntity(entityId, entityType, fieldsToRetrieve);
       if (e != null) {
-        tryReleaseCacheItems(relatedCacheItems);
         return e;
       }
     }
     LOG.debug("getEntity: Found nothing");
-    tryReleaseCacheItems(relatedCacheItems);
     return null;
   }
 
@@ -1099,7 +1078,6 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
         }
       }
     }
-    tryReleaseCacheItems(relatedCacheItems);
     return returnEvents;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f3c306e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 5a0822d..8540d45 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -57,10 +57,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
 
 import static 
org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
 import static org.junit.Assert.assertEquals;
@@ -68,7 +64,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
@@ -98,7 +93,7 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
   private static Path testDoneDirPath;
   private static String mainEntityLogFileName;
 
-  private EntityGroupFSTimelineStoreForTest store;
+  private EntityGroupFSTimelineStore store;
   private TimelineEntity entityNew;
 
   @Rule
@@ -150,7 +145,7 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
       createTestFiles(appId, attemotDirPath);
     }
 
-    store = new EntityGroupFSTimelineStoreForTest();
+    store = new EntityGroupFSTimelineStore();
     if (currTestName.getMethodName().contains("Plugin")) {
       rootDir = GenericTestUtils.getTestDir(getClass()
           .getSimpleName());
@@ -375,8 +370,6 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
         UserGroupInformation.getLoginUser());
     assertNotNull(entity3);
     assertEquals(entityNew.getStartTime(), entity3.getStartTime());
-    assertEquals(1, cacheItem.getRefCount());
-    assertEquals(1, EntityCacheItem.getActiveStores());
     // Verify multiple entities read
     NameValuePair primaryFilter = new NameValuePair(
         EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
@@ -392,74 +385,6 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
     assertEquals(cacheRefreshBefore + 1L, 
cacheRefresh.lastStat().numSamples());
   }
 
-  @Test(timeout = 90000L)
-  public void testMultiplePluginRead() throws Exception {
-    Thread mainThread = Thread.currentThread();
-    mainThread.setName("testMain");
-    // Verify precondition
-    assertEquals(EntityGroupPlugInForTest.class.getName(),
-        store.getConfig().get(
-            YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
-    // Prepare timeline store by making cache items
-    EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
-            AppState.COMPLETED);
-    final EntityCacheItem cacheItem = new EntityCacheItem(
-        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
-        config);
-
-    cacheItem.setAppLogs(appLogs);
-    store.setCachedLogs(
-        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
-        cacheItem);
-
-    // Launch the blocking read call in a future
-    ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
-    FutureTask<TimelineEntity> blockingReader =
-        new FutureTask<>(new Callable<TimelineEntity>() {
-          public TimelineEntity call() throws Exception {
-            Thread currThread = Thread.currentThread();
-            currThread.setName("blockingReader");
-            return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
-                EnumSet.allOf(TimelineReader.Field.class));
-          }});
-    threadExecutor.execute(blockingReader);
-    try {
-      while (!store.testCacheReferenced) {
-        Thread.sleep(300);
-      }
-    } catch (InterruptedException e) {
-      fail("Interrupted on exception " + e);
-    }
-    // Try refill the cache after the first cache item is referenced
-    for (ApplicationId appId : sampleAppIds) {
-      // Skip the first appId since it's already in cache
-      if (appId.equals(mainTestAppId)) {
-        continue;
-      }
-      EntityGroupFSTimelineStore.AppLogs currAppLog =
-          store.new AppLogs(appId, getTestRootPath(appId.toString()),
-              AppState.COMPLETED);
-      EntityCacheItem item = new EntityCacheItem(
-          EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
-          config);
-      item.setAppLogs(currAppLog);
-      store.setCachedLogs(
-          EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
-          item);
-    }
-    // At this time, the cache item of the blocking reader should be evicted.
-    assertEquals(1, cacheItem.getRefCount());
-    store.testCanProceed = true;
-    TimelineEntity entity3 = blockingReader.get();
-
-    assertNotNull(entity3);
-    assertEquals(entityNew.getStartTime(), entity3.getStartTime());
-    assertEquals(0, cacheItem.getRefCount());
-
-    threadExecutor.shutdownNow();
-  }
-
   @Test
   public void testSummaryRead() throws Exception {
     // Load data
@@ -518,38 +443,4 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
   private static String getAttemptDirName(ApplicationId appId) {
     return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + 
"_1";
   }
-
-  private static class EntityGroupFSTimelineStoreForTest
-      extends EntityGroupFSTimelineStore {
-    // Flags used for the concurrent testing environment
-    private volatile boolean testCanProceed = false;
-    private volatile boolean testCacheReferenced = false;
-
-    TimelineEntity getEntityBlocking(String entityId, String entityType,
-        EnumSet<Field> fieldsToRetrieve) throws IOException {
-      List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
-      List<TimelineStore> stores = getTimelineStoresForRead(entityId,
-          entityType, relatedCacheItems);
-
-      testCacheReferenced = true;
-      try {
-        while (!testCanProceed) {
-          Thread.sleep(1000);
-        }
-      } catch (InterruptedException e) {
-        fail("Interrupted " + e);
-      }
-
-      for (TimelineStore store : stores) {
-        TimelineEntity e =
-            store.getEntity(entityId, entityType, fieldsToRetrieve);
-        if (e != null) {
-          tryReleaseCacheItems(relatedCacheItems);
-          return e;
-        }
-      }
-      tryReleaseCacheItems(relatedCacheItems);
-      return null;
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to