This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 2.6.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9349e4adf50e20b79e3675dd798009039041defb Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Sun Sep 1 23:15:27 2019 +0800 KYLIN-4153 Delete marker if real file not exists --- .../common/persistence/PushdownResourceStore.java | 29 ++++--- .../kylin/common/persistence/ResourceStore.java | 91 ++++++++-------------- .../apache/kylin/rest/job/MetadataCleanupJob.java | 2 +- 3 files changed, 51 insertions(+), 71 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java index 7cb8ca6..587b691 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java @@ -39,21 +39,21 @@ import org.slf4j.LoggerFactory; * - The big resource is saved as HDFS file according to its resource path. * - Method like checkAndPut() does not work on big resource like such, because HDFS lack of transaction support. */ -abstract public class PushdownResourceStore extends ResourceStore { - private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class); +public abstract class PushdownResourceStore extends ResourceStore { + private static final Logger logger = LoggerFactory.getLogger(PushdownResourceStore.class); protected PushdownResourceStore(KylinConfig kylinConfig) { super(kylinConfig); } - final protected void putResourceImpl(String resPath, ContentWriter content, long ts) throws IOException { + protected final void putResourceImpl(String resPath, ContentWriter content, long ts) throws IOException { if (content.isBigContent()) putBigResource(resPath, content, ts); else putSmallResource(resPath, content, ts); } - abstract protected void putSmallResource(String resPath, ContentWriter content, long ts) throws IOException; + protected abstract void putSmallResource(String resPath, ContentWriter content, long ts) throws IOException; final void putBigResource(String resPath, ContentWriter content, long newTS) throws IOException { @@ -84,12 +84,14 @@ abstract public class PushdownResourceStore extends ResourceStore { Path backPath; boolean hasOldFile; boolean hasRollback = false; + String resPathStr; private RollbackablePushdown(String resPath, ContentWriter content) throws IOException { int salt = System.identityHashCode(resPath) + System.identityHashCode(content); tempPath = pushdownPath(resPath + ".temp." + salt); realPath = pushdownPath(resPath); backPath = pushdownPath(resPath + ".orig." + salt); + resPathStr = resPath; fs = pushdownFS(); if (fs.exists(tempPath)) @@ -136,9 +138,12 @@ abstract public class PushdownResourceStore extends ResourceStore { if (fs.exists(realPath)) fs.delete(realPath, true); - if (hasOldFile) + if (hasOldFile) { fs.rename(backPath, realPath); - + } else { + logger.warn("Try delete empty entry {}", resPathStr); + deleteResourceImpl(resPathStr); + } } catch (IOException ex2) { logger.error("Rollback failed", ex2); } @@ -167,23 +172,25 @@ abstract public class PushdownResourceStore extends ResourceStore { try { Path p = pushdownPath(resPath); FileSystem fs = pushdownFS(); - if (fs.exists(p)) + if (fs.exists(p)) { return fs.open(p); - else + } else { + logger.error("Marker exists but real file not found, delete marker."); + deleteResourceImpl(resPath); throw new FileNotFoundException(p.toString() + " (FS: " + fs + ")"); - + } } catch (Exception ex) { throw new IOException("Failed to read big resource " + resPath, ex); } } - abstract protected String pushdownRootPath(); + protected abstract String pushdownRootPath(); protected FileSystem pushdownFS() { return HadoopUtil.getFileSystem(new Path(kylinConfig.getMetastoreBigCellHdfsDirectory())); } - final protected Path pushdownPath(String resPath) { + protected final Path pushdownPath(String resPath) { Path p = new Path(pushdownRootPath() + resPath); return Path.getPathWithoutSchemeAndAuthority(p); } diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 3eeeb69..6568279 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -136,7 +136,7 @@ abstract public class ResourceStore { StringEntity.serializer); } StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.serializer); - return entity.toString(); + return entity == null ? "":entity.toString(); } /** @@ -286,12 +286,7 @@ abstract public class ResourceStore { private RawResource getResourceWithRetry(final String resPath) throws IOException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - return retry.doWithRetry(new Callable<RawResource>() { - @Override - public RawResource call() throws IOException { - return getResourceImpl(resPath); - } - }); + return retry.doWithRetry(() -> getResourceImpl(resPath)); } final public long getResourceTimestamp(String resPath) throws IOException { @@ -307,12 +302,7 @@ abstract public class ResourceStore { final String path = norm(resPath); ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - return retry.doWithRetry(new Callable<Long>() { - @Override - public Long call() throws IOException { - return getResourceTimestampImpl(path); - } - }); + return retry.doWithRetry(() -> getResourceTimestampImpl(path)); } /** @@ -364,19 +354,16 @@ abstract public class ResourceStore { protected void putResourceWithRetry(final String resPath, final ContentWriter content, final long ts) throws IOException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - retry.doWithRetry(new Callable() { - @Override - public Object call() throws IOException { - putResourceImpl(resPath, content, ts); - return null; - } + retry.doWithRetry(() -> { + putResourceImpl(resPath, content, ts); + return null; }); } /** * check & set, overwrite a resource */ - final public <T extends RootPersistentEntity> void checkAndPutResource(String resPath, T obj, + public final <T extends RootPersistentEntity> void checkAndPutResource(String resPath, T obj, Serializer<T> serializer) throws IOException, WriteConflictException { checkAndPutResource(resPath, obj, System.currentTimeMillis(), serializer); } @@ -384,7 +371,7 @@ abstract public class ResourceStore { /** * check & set, overwrite a resource */ - final public <T extends RootPersistentEntity> void checkAndPutResource(String resPath, T obj, long newTS, + public final <T extends RootPersistentEntity> void checkAndPutResource(String resPath, T obj, long newTS, Serializer<T> serializer) throws IOException, WriteConflictException { resPath = norm(resPath); @@ -413,7 +400,7 @@ abstract public class ResourceStore { * * @return a confirmed TS, as some store may lose timestamp precision. */ - final public long checkAndPutResource(String resPath, byte[] content, long oldTS, long newTS) + public final long checkAndPutResource(String resPath, byte[] content, long oldTS, long newTS) throws IOException, WriteConflictException { return checkAndPutResourceCheckpoint(norm(resPath), content, oldTS, newTS); } @@ -427,24 +414,19 @@ abstract public class ResourceStore { /** * checks old timestamp when overwriting existing */ - abstract protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) + protected abstract long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, WriteConflictException; private long checkAndPutResourceWithRetry(final String resPath, final byte[] content, final long oldTS, final long newTS) throws IOException, WriteConflictException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - return retry.doWithRetry(new Callable<Long>() { - @Override - public Long call() throws IOException { - return checkAndPutResourceImpl(resPath, content, oldTS, newTS); - } - }); + return retry.doWithRetry(() -> checkAndPutResourceImpl(resPath, content, oldTS, newTS)); } /** * update resource timestamp to timestamp */ - final public void updateTimestamp(String resPath, long timestamp) throws IOException { + public final void updateTimestamp(String resPath, long timestamp) throws IOException { logger.trace("Updating resource: {} with timestamp {} (Store {})", resPath, timestamp, kylinConfig.getMetadataUrl()); updateTimestampCheckPoint(norm(resPath), timestamp); @@ -457,21 +439,18 @@ abstract public class ResourceStore { private void updateTimestampWithRetry(final String resPath, final long timestamp) throws IOException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - retry.doWithRetry(new Callable() { - @Override - public Object call() throws IOException { - updateTimestampImpl(resPath, timestamp); - return null; - } + retry.doWithRetry(() -> { + updateTimestampImpl(resPath, timestamp); + return null; }); } - abstract protected void updateTimestampImpl(String resPath, long timestamp) throws IOException; + protected abstract void updateTimestampImpl(String resPath, long timestamp) throws IOException; /** * delete a resource, does nothing on a folder */ - final public void deleteResource(String resPath) throws IOException { + public final void deleteResource(String resPath) throws IOException { logger.trace("Deleting resource {} (Store {})", resPath, kylinConfig.getMetadataUrl()); deleteResourceCheckpoint(norm(resPath)); } @@ -482,7 +461,7 @@ abstract public class ResourceStore { * throw an IOException when the resource lastModified >= timestamp + 1000 * See https://issues.apache.org/jira/browse/KYLIN-4030 */ - final public void deleteResource(String resPath, long timestamp) throws IOException { + public final void deleteResource(String resPath, long timestamp) throws IOException { logger.trace("Deleting resource {} within timestamp {} (Store {})", resPath, timestamp, kylinConfig.getMetadataUrl()); deleteResourceCheckpoint(norm(resPath), timestamp); @@ -498,29 +477,23 @@ abstract public class ResourceStore { deleteResourceWithRetry(resPath, timestamp); } - abstract protected void deleteResourceImpl(String resPath) throws IOException; + protected abstract void deleteResourceImpl(String resPath) throws IOException; - abstract protected void deleteResourceImpl(String resPath, long timestamp) throws IOException; + protected abstract void deleteResourceImpl(String resPath, long timestamp) throws IOException; private void deleteResourceWithRetry(final String resPath) throws IOException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - retry.doWithRetry(new Callable() { - @Override - public Object call() throws IOException { - deleteResourceImpl(resPath); - return null; - } + retry.doWithRetry(() -> { + deleteResourceImpl(resPath); + return null; }); } private void deleteResourceWithRetry(final String resPath, final long timestamp) throws IOException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); - retry.doWithRetry(new Callable() { - @Override - public Object call() throws IOException { - deleteResourceImpl(resPath, timestamp); - return null; - } + retry.doWithRetry(() -> { + deleteResourceImpl(resPath, timestamp); + return null; }); } @@ -574,11 +547,11 @@ abstract public class ResourceStore { /** * get a readable string of a resource path */ - final public String getReadableResourcePath(String resPath) { + public final String getReadableResourcePath(String resPath) { return getReadableResourcePathImpl(norm(resPath)); } - abstract protected String getReadableResourcePathImpl(String resPath); + protected abstract String getReadableResourcePathImpl(String resPath); private String norm(String resPath) { resPath = resPath.trim(); @@ -727,7 +700,7 @@ abstract public class ResourceStore { * Visit all resource under a folder (optionally recursively), without loading the content of resource. * Low level API, DON'T support ExponentialBackoffRetry, caller should do necessary retry */ - final public void visitFolder(String folderPath, boolean recursive, Visitor visitor) throws IOException { + public final void visitFolder(String folderPath, boolean recursive, Visitor visitor) throws IOException { visitFolderInner(folderPath, recursive, null, false, visitor); } @@ -735,7 +708,7 @@ abstract public class ResourceStore { * Visit all resource under a folder (optionally recursively), without loading the content of resource. * Low level API, DON'T support ExponentialBackoffRetry, caller should do necessary retry */ - final public void visitFolder(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) + public final void visitFolder(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) throws IOException { visitFolderInner(folderPath, recursive, filter, false, visitor); } @@ -744,7 +717,7 @@ abstract public class ResourceStore { * Visit all resource and their content under a folder (optionally recursively). * Low level API, DON'T support ExponentialBackoffRetry, caller should do necessary retry */ - final public void visitFolderAndContent(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) + public final void visitFolderAndContent(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) throws IOException { visitFolderInner(folderPath, recursive, filter, true, visitor); } @@ -774,7 +747,7 @@ abstract public class ResourceStore { * * NOTE: Broken content exception should be wrapped by RawResource, and return to caller to decide how to handle. */ - abstract protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, + protected abstract void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent, Visitor visitor) throws IOException; public static String dumpResources(KylinConfig kylinConfig, Collection<String> dumpList) throws IOException { diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java index 17eebd6..cb56c25 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java @@ -242,7 +242,7 @@ public class MetadataCleanupJob { } private NavigableSet<String> noNull(NavigableSet<String> list) { - return (list == null) ? new TreeSet<String>() : list; + return (list == null) ? new TreeSet<>() : list; } private long getTimestamp(String resPath) {