This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9349e4adf50e20b79e3675dd798009039041defb
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Sun Sep 1 23:15:27 2019 +0800

    KYLIN-4153 Delete marker if real file not exists
---
 .../common/persistence/PushdownResourceStore.java  | 29 ++++---
 .../kylin/common/persistence/ResourceStore.java    | 91 ++++++++--------------
 .../apache/kylin/rest/job/MetadataCleanupJob.java  |  2 +-
 3 files changed, 51 insertions(+), 71 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
index 7cb8ca6..587b691 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
@@ -39,21 +39,21 @@ import org.slf4j.LoggerFactory;
  * - The big resource is saved as HDFS file according to its resource path.
  * - Method like checkAndPut() does not work on big resource like such, 
because HDFS lack of transaction support.
  */
-abstract public class PushdownResourceStore extends ResourceStore {
-    private static final Logger logger = 
LoggerFactory.getLogger(HDFSResourceStore.class);
+public abstract class PushdownResourceStore extends ResourceStore {
+    private static final Logger logger = 
LoggerFactory.getLogger(PushdownResourceStore.class);
 
     protected PushdownResourceStore(KylinConfig kylinConfig) {
         super(kylinConfig);
     }
 
-    final protected void putResourceImpl(String resPath, ContentWriter 
content, long ts) throws IOException {
+    protected final void putResourceImpl(String resPath, ContentWriter 
content, long ts) throws IOException {
         if (content.isBigContent())
             putBigResource(resPath, content, ts);
         else
             putSmallResource(resPath, content, ts);
     }
 
-    abstract protected void putSmallResource(String resPath, ContentWriter 
content, long ts) throws IOException;
+    protected abstract void putSmallResource(String resPath, ContentWriter 
content, long ts) throws IOException;
 
     final void putBigResource(String resPath, ContentWriter content, long 
newTS) throws IOException {
 
@@ -84,12 +84,14 @@ abstract public class PushdownResourceStore extends 
ResourceStore {
         Path backPath;
         boolean hasOldFile;
         boolean hasRollback = false;
+        String resPathStr;
 
         private RollbackablePushdown(String resPath, ContentWriter content) 
throws IOException {
             int salt = System.identityHashCode(resPath) + 
System.identityHashCode(content);
             tempPath = pushdownPath(resPath + ".temp." + salt);
             realPath = pushdownPath(resPath);
             backPath = pushdownPath(resPath + ".orig." + salt);
+            resPathStr = resPath;
             fs = pushdownFS();
 
             if (fs.exists(tempPath))
@@ -136,9 +138,12 @@ abstract public class PushdownResourceStore extends 
ResourceStore {
                 if (fs.exists(realPath))
                     fs.delete(realPath, true);
 
-                if (hasOldFile)
+                if (hasOldFile) {
                     fs.rename(backPath, realPath);
-
+                } else {
+                    logger.warn("Try delete empty entry {}", resPathStr);
+                    deleteResourceImpl(resPathStr);
+                }
             } catch (IOException ex2) {
                 logger.error("Rollback failed", ex2);
             }
@@ -167,23 +172,25 @@ abstract public class PushdownResourceStore extends 
ResourceStore {
         try {
             Path p = pushdownPath(resPath);
             FileSystem fs = pushdownFS();
-            if (fs.exists(p))
+            if (fs.exists(p)) {
                 return fs.open(p);
-            else
+            } else {
+                logger.error("Marker exists but real file not found, delete 
marker.");
+                deleteResourceImpl(resPath);
                 throw new FileNotFoundException(p.toString() + "  (FS: " + fs 
+ ")");
-
+            }
         } catch (Exception ex) {
             throw new IOException("Failed to read big resource " + resPath, 
ex);
         }
     }
 
-    abstract protected String pushdownRootPath();
+    protected abstract String pushdownRootPath();
 
     protected FileSystem pushdownFS() {
         return HadoopUtil.getFileSystem(new 
Path(kylinConfig.getMetastoreBigCellHdfsDirectory()));
     }
 
-    final protected Path pushdownPath(String resPath) {
+    protected final Path pushdownPath(String resPath) {
         Path p = new Path(pushdownRootPath() + resPath);
         return Path.getPathWithoutSchemeAndAuthority(p);
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 3eeeb69..6568279 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -136,7 +136,7 @@ abstract public class ResourceStore {
                     StringEntity.serializer);
         }
         StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, 
StringEntity.serializer);
-        return entity.toString();
+        return entity == null ? "":entity.toString();
     }
 
     /**
@@ -286,12 +286,7 @@ abstract public class ResourceStore {
 
     private RawResource getResourceWithRetry(final String resPath) throws 
IOException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        return retry.doWithRetry(new Callable<RawResource>() {
-            @Override
-            public RawResource call() throws IOException {
-                return getResourceImpl(resPath);
-            }
-        });
+        return retry.doWithRetry(() -> getResourceImpl(resPath));
     }
 
     final public long getResourceTimestamp(String resPath) throws IOException {
@@ -307,12 +302,7 @@ abstract public class ResourceStore {
         final String path = norm(resPath);
 
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        return retry.doWithRetry(new Callable<Long>() {
-            @Override
-            public Long call() throws IOException {
-                return getResourceTimestampImpl(path);
-            }
-        });
+        return retry.doWithRetry(() -> getResourceTimestampImpl(path));
     }
 
     /**
@@ -364,19 +354,16 @@ abstract public class ResourceStore {
     protected void putResourceWithRetry(final String resPath, final 
ContentWriter content, final long ts)
             throws IOException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        retry.doWithRetry(new Callable() {
-            @Override
-            public Object call() throws IOException {
-                putResourceImpl(resPath, content, ts);
-                return null;
-            }
+        retry.doWithRetry(() -> {
+            putResourceImpl(resPath, content, ts);
+            return null;
         });
     }
 
     /**
      * check & set, overwrite a resource
      */
-    final public <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj,
+    public final <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj,
             Serializer<T> serializer) throws IOException, 
WriteConflictException {
         checkAndPutResource(resPath, obj, System.currentTimeMillis(), 
serializer);
     }
@@ -384,7 +371,7 @@ abstract public class ResourceStore {
     /**
      * check & set, overwrite a resource
      */
-    final public <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj, long newTS,
+    public final <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj, long newTS,
             Serializer<T> serializer) throws IOException, 
WriteConflictException {
         resPath = norm(resPath);
 
@@ -413,7 +400,7 @@ abstract public class ResourceStore {
      *
      * @return a confirmed TS, as some store may lose timestamp precision.
      */
-    final public long checkAndPutResource(String resPath, byte[] content, long 
oldTS, long newTS)
+    public final long checkAndPutResource(String resPath, byte[] content, long 
oldTS, long newTS)
             throws IOException, WriteConflictException {
         return checkAndPutResourceCheckpoint(norm(resPath), content, oldTS, 
newTS);
     }
@@ -427,24 +414,19 @@ abstract public class ResourceStore {
     /**
      * checks old timestamp when overwriting existing
      */
-    abstract protected long checkAndPutResourceImpl(String resPath, byte[] 
content, long oldTS, long newTS)
+    protected abstract long checkAndPutResourceImpl(String resPath, byte[] 
content, long oldTS, long newTS)
             throws IOException, WriteConflictException;
 
     private long checkAndPutResourceWithRetry(final String resPath, final 
byte[] content, final long oldTS,
             final long newTS) throws IOException, WriteConflictException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        return retry.doWithRetry(new Callable<Long>() {
-            @Override
-            public Long call() throws IOException {
-                return checkAndPutResourceImpl(resPath, content, oldTS, newTS);
-            }
-        });
+        return retry.doWithRetry(() -> checkAndPutResourceImpl(resPath, 
content, oldTS, newTS));
     }
 
     /**
      * update resource timestamp to timestamp
      */
-    final public void updateTimestamp(String resPath, long timestamp) throws 
IOException {
+    public final void updateTimestamp(String resPath, long timestamp) throws 
IOException {
         logger.trace("Updating resource: {} with timestamp {} (Store {})", 
resPath, timestamp,
                 kylinConfig.getMetadataUrl());
         updateTimestampCheckPoint(norm(resPath), timestamp);
@@ -457,21 +439,18 @@ abstract public class ResourceStore {
 
     private void updateTimestampWithRetry(final String resPath, final long 
timestamp) throws IOException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        retry.doWithRetry(new Callable() {
-            @Override
-            public Object call() throws IOException {
-                updateTimestampImpl(resPath, timestamp);
-                return null;
-            }
+        retry.doWithRetry(() -> {
+            updateTimestampImpl(resPath, timestamp);
+            return null;
         });
     }
 
-    abstract protected void updateTimestampImpl(String resPath, long 
timestamp) throws IOException;
+    protected abstract void updateTimestampImpl(String resPath, long 
timestamp) throws IOException;
 
     /**
      * delete a resource, does nothing on a folder
      */
-    final public void deleteResource(String resPath) throws IOException {
+    public final void deleteResource(String resPath) throws IOException {
         logger.trace("Deleting resource {} (Store {})", resPath, 
kylinConfig.getMetadataUrl());
         deleteResourceCheckpoint(norm(resPath));
     }
@@ -482,7 +461,7 @@ abstract public class ResourceStore {
      * throw an IOException when the resource lastModified >= timestamp + 1000
      * See  https://issues.apache.org/jira/browse/KYLIN-4030
      */
-    final public void deleteResource(String resPath, long timestamp) throws 
IOException {
+    public final void deleteResource(String resPath, long timestamp) throws 
IOException {
         logger.trace("Deleting resource {} within timestamp {} (Store {})", 
resPath, timestamp,
                 kylinConfig.getMetadataUrl());
         deleteResourceCheckpoint(norm(resPath), timestamp);
@@ -498,29 +477,23 @@ abstract public class ResourceStore {
         deleteResourceWithRetry(resPath, timestamp);
     }
 
-    abstract protected void deleteResourceImpl(String resPath) throws 
IOException;
+    protected abstract void deleteResourceImpl(String resPath) throws 
IOException;
 
-    abstract protected void deleteResourceImpl(String resPath, long timestamp) 
throws IOException;
+    protected abstract void deleteResourceImpl(String resPath, long timestamp) 
throws IOException;
 
     private void deleteResourceWithRetry(final String resPath) throws 
IOException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        retry.doWithRetry(new Callable() {
-            @Override
-            public Object call() throws IOException {
-                deleteResourceImpl(resPath);
-                return null;
-            }
+        retry.doWithRetry(() -> {
+            deleteResourceImpl(resPath);
+            return null;
         });
     }
 
     private void deleteResourceWithRetry(final String resPath, final long 
timestamp) throws IOException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
-        retry.doWithRetry(new Callable() {
-            @Override
-            public Object call() throws IOException {
-                deleteResourceImpl(resPath, timestamp);
-                return null;
-            }
+        retry.doWithRetry(() -> {
+            deleteResourceImpl(resPath, timestamp);
+            return null;
         });
     }
 
@@ -574,11 +547,11 @@ abstract public class ResourceStore {
     /**
      * get a readable string of a resource path
      */
-    final public String getReadableResourcePath(String resPath) {
+    public final String getReadableResourcePath(String resPath) {
         return getReadableResourcePathImpl(norm(resPath));
     }
 
-    abstract protected String getReadableResourcePathImpl(String resPath);
+    protected abstract String getReadableResourcePathImpl(String resPath);
 
     private String norm(String resPath) {
         resPath = resPath.trim();
@@ -727,7 +700,7 @@ abstract public class ResourceStore {
      * Visit all resource under a folder (optionally recursively), without 
loading the content of resource.
      * Low level API, DON'T support ExponentialBackoffRetry, caller should do 
necessary retry
      */
-    final public void visitFolder(String folderPath, boolean recursive, 
Visitor visitor) throws IOException {
+    public final void visitFolder(String folderPath, boolean recursive, 
Visitor visitor) throws IOException {
         visitFolderInner(folderPath, recursive, null, false, visitor);
     }
 
@@ -735,7 +708,7 @@ abstract public class ResourceStore {
      * Visit all resource under a folder (optionally recursively), without 
loading the content of resource.
      * Low level API, DON'T support ExponentialBackoffRetry, caller should do 
necessary retry
      */
-    final public void visitFolder(String folderPath, boolean recursive, 
VisitFilter filter, Visitor visitor)
+    public final void visitFolder(String folderPath, boolean recursive, 
VisitFilter filter, Visitor visitor)
             throws IOException {
         visitFolderInner(folderPath, recursive, filter, false, visitor);
     }
@@ -744,7 +717,7 @@ abstract public class ResourceStore {
      * Visit all resource and their content under a folder (optionally 
recursively).
      * Low level API, DON'T support ExponentialBackoffRetry, caller should do 
necessary retry
      */
-    final public void visitFolderAndContent(String folderPath, boolean 
recursive, VisitFilter filter, Visitor visitor)
+    public final void visitFolderAndContent(String folderPath, boolean 
recursive, VisitFilter filter, Visitor visitor)
             throws IOException {
         visitFolderInner(folderPath, recursive, filter, true, visitor);
     }
@@ -774,7 +747,7 @@ abstract public class ResourceStore {
      *
      * NOTE: Broken content exception should be wrapped by RawResource, and 
return to caller to decide how to handle.
      */
-    abstract protected void visitFolderImpl(String folderPath, boolean 
recursive, VisitFilter filter,
+    protected abstract void visitFolderImpl(String folderPath, boolean 
recursive, VisitFilter filter,
             boolean loadContent, Visitor visitor) throws IOException;
 
     public static String dumpResources(KylinConfig kylinConfig, 
Collection<String> dumpList) throws IOException {
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 17eebd6..cb56c25 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -242,7 +242,7 @@ public class MetadataCleanupJob {
     }
 
     private NavigableSet<String> noNull(NavigableSet<String> list) {
-        return (list == null) ? new TreeSet<String>() : list;
+        return (list == null) ? new TreeSet<>() : list;
     }
 
     private long getTimestamp(String resPath) {

Reply via email to