shaofengshi closed pull request #178: KYLIN-3470 Add cache for execute and execute_output URL: https://github.com/apache/kylin/pull/178
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-job/src/main/java/org/apache/kylin/job/JobSearchResult.java b/core-job/src/main/java/org/apache/kylin/job/JobSearchResult.java new file mode 100644 index 0000000000..bd4326dd3e --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/JobSearchResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +public class JobSearchResult implements Comparable<JobSearchResult> { + private String id; + + private String jobName; + + private String cubeName; + + private long lastModified; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getCubeName() { + return cubeName; + } + + public void setCubeName(String cubeName) { + this.cubeName = cubeName; + } + + public long getLastModified() { + return lastModified; + } + + public void setLastModified(long lastModified) { + this.lastModified = lastModified; + } + + @Override + public int compareTo(JobSearchResult o) { + return o.lastModified < this.lastModified ? -1 : o.lastModified > this.lastModified ? 1 : 0; + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 1b34aa1ca7..0cc6c8e5fb 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -28,7 +28,11 @@ import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.AutoReadWriteLock; import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; +import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +59,140 @@ static ExecutableDao newInstance(KylinConfig config) throws IOException { private ResourceStore store; - private ExecutableDao(KylinConfig config) { + private CaseInsensitiveStringCache<ExecutablePO> executableDigestMap; + + private CaseInsensitiveStringCache<ExecutableOutputPO> executableOutputDigestMap; + + private CachedCrudAssist<ExecutablePO> executableDigestCrud; + + private CachedCrudAssist<ExecutableOutputPO> executableOutputDigestCrud; + + private AutoReadWriteLock executableDigestMapLock = new AutoReadWriteLock(); + + private AutoReadWriteLock executableOutputDigestMapLock = new AutoReadWriteLock(); + + private ExecutableDao(KylinConfig config) throws IOException { logger.info("Using metadata url: " + config); this.store = ResourceStore.getStore(config); + this.executableDigestMap = new CaseInsensitiveStringCache<>(config, "execute"); + this.executableDigestCrud = new CachedCrudAssist<ExecutablePO>(store, ResourceStore.EXECUTE_RESOURCE_ROOT, "", + ExecutablePO.class, executableDigestMap, false) { + @Override + public ExecutablePO reloadAt(String path) { + try { + ExecutablePO executablePO = readJobResource(path); + if (executablePO == null) { + logger.warn("No job found at " + path + ", returning null"); + executableDigestMap.removeLocal(resourceName(path)); + return null; + } + + // create a digest + ExecutablePO digestExecutablePO = new ExecutablePO(); + digestExecutablePO.setUuid(executablePO.getUuid()); + digestExecutablePO.setName(executablePO.getName()); + digestExecutablePO.setLastModified(executablePO.getLastModified()); + digestExecutablePO.setType(executablePO.getType()); + digestExecutablePO.setParams(executablePO.getParams()); + executableDigestMap.putLocal(resourceName(path), digestExecutablePO); + return digestExecutablePO; + } catch (Exception e) { + throw new IllegalStateException("Error loading execute at " + path, e); + } + } + + @Override + protected ExecutablePO initEntityAfterReload(ExecutablePO entity, String resourceName) { + return entity; + } + }; + this.executableDigestCrud.setCheckCopyOnWrite(true); + this.executableDigestCrud.reloadAll(); + + this.executableOutputDigestMap = new CaseInsensitiveStringCache<>(config, "execute_output"); + this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store, ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, + "", ExecutableOutputPO.class, executableOutputDigestMap, false) { + @Override + public void reloadAll() throws IOException { + logger.debug("Reloading execute_output from " + ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT); + executableOutputDigestMap.clear(); + + NavigableSet<String> paths = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT); + + if (paths != null) { + for (String path : paths) { + if (!isTaskExecutableOutput(resourceName(path))) + reloadAt(path); + } + + logger.debug("Loaded " + executableOutputDigestMap.size() + " execute_output digest(s) out of " + paths.size() + + " resource"); + } + } + + @Override + public ExecutableOutputPO reloadAt(String path) { + try { + ExecutableOutputPO executableOutputPO = readJobOutputResource(path); + if (executableOutputPO == null) { + logger.warn("No job output found at " + path + ", returning null"); + executableOutputDigestMap.removeLocal(resourceName(path)); + return null; + } + + // create a digest + ExecutableOutputPO digestExecutableOutputPO = new ExecutableOutputPO(); + digestExecutableOutputPO.setUuid(executableOutputPO.getUuid()); + digestExecutableOutputPO.setLastModified(executableOutputPO.getLastModified()); + digestExecutableOutputPO.setStatus(executableOutputPO.getStatus()); + executableOutputDigestMap.putLocal(resourceName(path), digestExecutableOutputPO); + return digestExecutableOutputPO; + } catch (Exception e) { + throw new IllegalStateException("Error loading execute at " + path, e); + } + } + + @Override + protected ExecutableOutputPO initEntityAfterReload(ExecutableOutputPO entity, String resourceName) { + return entity; + } + }; + this.executableOutputDigestCrud.setCheckCopyOnWrite(true); + this.executableOutputDigestCrud.reloadAll(); + Broadcaster.getInstance(config).registerListener(new JobSyncListener(), "execute"); + Broadcaster.getInstance(config).registerListener(new JobOutputSyncListener(), "execute_output"); + } + + private boolean isTaskExecutableOutput(String id) { + return id.length() > 36; + } + + private class JobSyncListener extends Broadcaster.Listener { + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + try (AutoReadWriteLock.AutoLock l = executableDigestMapLock.lockForWrite()) { + if (event == Broadcaster.Event.DROP) + executableDigestMap.removeLocal(cacheKey); + else + executableDigestCrud.reloadQuietly(cacheKey); + } + } + } + + private class JobOutputSyncListener extends Broadcaster.Listener { + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + try (AutoReadWriteLock.AutoLock l = executableOutputDigestMapLock.lockForWrite()) { + if (!isTaskExecutableOutput(cacheKey)) { + if (event == Broadcaster.Event.DROP) + executableOutputDigestMap.removeLocal(cacheKey); + else + executableOutputDigestCrud.reloadQuietly(cacheKey); + } + } + } } private String pathOfJob(ExecutablePO job) { @@ -106,6 +241,15 @@ private long writeJobOutputResource(String path, ExecutableOutputPO output) thro } } + public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long timeEndExclusive) { + List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList(); + for (ExecutableOutputPO po : executableOutputDigestMap.values()) { + if (po.getLastModified() >= timeStart && po.getLastModified() < timeEndExclusive) + jobOutputDigests.add(po); + } + return jobOutputDigests; + } + public List<ExecutablePO> getJobs() throws PersistentException { try { return store.getAllResources(ResourceStore.EXECUTE_RESOURCE_ROOT, ExecutablePO.class, JOB_SERIALIZER); @@ -124,6 +268,15 @@ private long writeJobOutputResource(String path, ExecutableOutputPO output) thro } } + public List<ExecutablePO> getJobDigests(long timeStart, long timeEndExclusive) { + List<ExecutablePO> jobDigests = Lists.newArrayList(); + for (ExecutablePO po : executableDigestMap.values()) { + if (po.getLastModified() >= timeStart && po.getLastModified() < timeEndExclusive) + jobDigests.add(po); + } + return jobDigests; + } + public List<String> getJobIds() throws PersistentException { try { NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); @@ -156,6 +309,7 @@ public ExecutablePO addJob(ExecutablePO job) throws PersistentException { throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists"); } writeJobResource(pathOfJob(job), job); + executableDigestMap.put(job.getId(), job); return job; } catch (IOException e) { logger.error("error save job:" + job.getUuid(), e); @@ -170,6 +324,7 @@ public ExecutablePO updateJob(ExecutablePO job) throws PersistentException { } final long ts = writeJobResource(pathOfJob(job), job); job.setLastModified(ts); + executableDigestMap.put(job.getId(), job); return job; } catch (IOException e) { logger.error("error update job:" + job.getUuid(), e); @@ -180,6 +335,7 @@ public ExecutablePO updateJob(ExecutablePO job) throws PersistentException { public void deleteJob(String uuid) throws PersistentException { try { store.deleteResource(pathOfJob(uuid)); + executableDigestMap.remove(uuid); } catch (IOException e) { logger.error("error delete job:" + uuid, e); throw new PersistentException(e); @@ -205,6 +361,8 @@ public void addJobOutput(ExecutableOutputPO output) throws PersistentException { try { output.setLastModified(0); writeJobOutputResource(pathOfJobOutput(output.getUuid()), output); + if (!isTaskExecutableOutput(output.getUuid())) + executableOutputDigestMap.put(output.getUuid(), output); } catch (IOException e) { logger.error("error update job output id:" + output.getUuid(), e); throw new PersistentException(e); @@ -215,6 +373,8 @@ public void updateJobOutput(ExecutableOutputPO output) throws PersistentExceptio try { final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output); output.setLastModified(ts); + if (!isTaskExecutableOutput(output.getUuid())) + executableOutputDigestMap.put(output.getUuid(), output); } catch (IOException e) { logger.error("error update job output id:" + output.getUuid(), e); throw new PersistentException(e); @@ -224,6 +384,8 @@ public void updateJobOutput(ExecutableOutputPO output) throws PersistentExceptio public void deleteJobOutput(String uuid) throws PersistentException { try { store.deleteResource(pathOfJobOutput(uuid)); + if (!isTaskExecutableOutput(uuid)) + executableOutputDigestMap.remove(uuid); } catch (IOException e) { logger.error("error delete job:" + uuid, e); throw new PersistentException(e); diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index abcb0481ea..d37b3da814 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -202,6 +202,16 @@ private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { } } + public Map<String, ExecutableOutputPO> getAllOutputDigests(long timeStartInMillis, long timeEndInMillis) { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputDigests(timeStartInMillis, + timeEndInMillis); + HashMap<String, ExecutableOutputPO> result = Maps.newHashMap(); + for (ExecutableOutputPO jobOutput : jobOutputs) { + result.put(jobOutput.getId(), jobOutput); + } + return result; + } + public List<AbstractExecutable> getAllExecutables() { try { List<AbstractExecutable> ret = Lists.newArrayList(); @@ -238,6 +248,19 @@ private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { } } + public List<AbstractExecutable> getAllExecutableDigests(long timeStartInMillis, long timeEndInMillis) { + List<AbstractExecutable> ret = Lists.newArrayList(); + for (ExecutablePO po : executableDao.getJobDigests(timeStartInMillis, timeEndInMillis)) { + try { + AbstractExecutable ae = parseTo(po); + ret.add(ae); + } catch (IllegalArgumentException e) { + logger.error("error parsing one executabePO: ", e); + } + } + return ret; + } + public List<String> getAllJobIds() { try { return executableDao.getJobIds(); diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java index be3d8d470f..37e632808a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/CachedCrudAssist.java @@ -108,7 +108,7 @@ private String resourcePath(String resourceName) { return resRootPath + "/" + resourceName + resPathSuffix; } - private String resourceName(String resourcePath) { + protected String resourceName(String resourcePath) { Preconditions.checkArgument(resourcePath.startsWith(resRootPath)); Preconditions.checkArgument(resourcePath.endsWith(resPathSuffix)); return resourcePath.substring(resRootPath.length() + 1, resourcePath.length() - resPathSuffix.length()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index 862d7e5831..19fdca89b4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@ -27,11 +27,14 @@ import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.JobSearchResult; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.CheckpointExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.slf4j.Logger; @@ -201,4 +204,35 @@ public static JobStepStatusEnum parseToJobStepStatus(ExecutableState state) { throw new RuntimeException("invalid state:" + state); } } + + public static JobSearchResult parseToJobSearchResult(DefaultChainedExecutable job, Map<String, ExecutableOutputPO> outputs) { + if (job == null) { + logger.warn("job is null."); + return null; + } + + ExecutableOutputPO output = outputs.get(job.getId()); + if (output == null) { + logger.warn("job output is null."); + return null; + } + + final JobSearchResult result = new JobSearchResult(); + + String cubeName = CubingExecutableUtil.getCubeName(job.getParams()); + + if (cubeName == null) { + cubeName = job.getParam("model_name"); + } else { + CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + if (cube != null) { + cubeName = cube.getDisplayName(); + } + } + result.setCubeName(cubeName); + result.setId(job.getId()); + result.setJobName(job.getName()); + result.setLastModified(output.getLastModified()); + return result; + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index 1bf6ab6bc2..54076493bc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -83,7 +83,7 @@ } try { - jobInstanceList = jobService.searchJobs(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList, + jobInstanceList = jobService.searchJobsV2(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList, jobRequest.getLimit(), jobRequest.getOffset(), timeFilter, jobSearchMode); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index b1b69f61db..2603267dd2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -18,12 +18,19 @@ package org.apache.kylin.rest.service; -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + +import javax.annotation.Nullable; + import org.apache.commons.lang3.StringUtils; import org.apache.directory.api.util.Strings; import org.apache.kylin.common.KylinConfig; @@ -41,10 +48,12 @@ import org.apache.kylin.engine.mr.common.JobInfoConverter; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.JobSearchResult; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.SchedulerFactory; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; +import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.exception.SchedulerException; @@ -74,16 +83,12 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * @author ysong1 @@ -387,7 +392,8 @@ public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, String s return optimizeJobInstance; } - public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException { + public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, + String submitter) throws IOException { LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build(); getExecutableManager().addJob(job); @@ -660,17 +666,19 @@ public void dropJob(JobInstance job) { + SecurityContextHolder.getContext().getAuthentication().getName()); } + //******************************** Job search apis for Job controller V1 ******************************************* /** - * currently only support substring match - * - * @return - */ + * currently only support substring match + * + * @return + */ public List<JobInstance> searchJobs(final String cubeNameSubstring, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { Integer limit = (null == limitValue) ? 30 : limitValue; Integer offset = (null == offsetValue) ? 0 : offsetValue; - List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, jobSearchMode); + List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, + jobSearchMode); Collections.sort(jobs); @@ -685,26 +693,18 @@ public void dropJob(JobInstance job) { return jobs.subList(offset, offset + limit); } + /** + * it loads all metadata of "execute" and "execute_output", and parses all job instances within the scope of the given filters + * + * @return List of job instances searched by the method + * + */ public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName, - final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { - return searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, JobSearchMode.CUBING_ONLY); - } - - public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName, - final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, final JobSearchMode jobSearchMode) { + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, + final JobSearchMode jobSearchMode) { return innerSearchJobs(cubeNameSubstring, null, projectName, statusList, timeFilter, jobSearchMode); } - public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName, - final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { - return searchJobsByJobName(jobName, projectName, statusList, timeFilter, JobSearchMode.ALL); - } - - public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName, - final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { - return innerSearchJobs(null, jobName, projectName, statusList, timeFilter, jobSearchMode); - } - public List<JobInstance> innerSearchJobs(final String cubeName, final String jobName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { List<JobInstance> result = Lists.newArrayList(); @@ -756,82 +756,90 @@ public boolean apply(@Nullable JobInstance input) { })); } + /** + * loads all metadata of "execute" and returns list of cubing job within the scope of the given filters + * + * @param allOutputs map of executable output data with type DefaultOutput parsed from ExecutableOutputPO + * + */ public List<CubingJob> innerSearchCubingJobs(final String cubeName, final String jobName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) { List<CubingJob> results = Lists.newArrayList( FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)) .filter(new Predicate<AbstractExecutable>() { - @Override - public boolean apply(AbstractExecutable executable) { - if (executable instanceof CubingJob) { - if (StringUtils.isEmpty(cubeName)) { - return true; + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CubingJob) { + if (StringUtils.isEmpty(cubeName)) { + return true; + } + String executableCubeName = CubingExecutableUtil + .getCubeName(executable.getParams()); + if (executableCubeName == null) + return true; + if (nameExactMatch) + return executableCubeName.equalsIgnoreCase(cubeName); + else + return executableCubeName.toLowerCase().contains(cubeName.toLowerCase()); + } else { + return false; + } } - String executableCubeName = CubingExecutableUtil.getCubeName(executable.getParams()); - if (executableCubeName == null) - return true; - if (nameExactMatch) - return executableCubeName.equalsIgnoreCase(cubeName); - else - return executableCubeName.toLowerCase().contains(cubeName.toLowerCase()); - } else { - return false; - } - } - }).transform(new Function<AbstractExecutable, CubingJob>() { - @Override - public CubingJob apply(AbstractExecutable executable) { - return (CubingJob) executable; - } - }).filter(Predicates.and(new Predicate<CubingJob>() { - @Override - public boolean apply(CubingJob executable) { - if (null == projectName || null == getProjectManager().getProject(projectName)) { - return true; - } else { - return projectName.equalsIgnoreCase(executable.getProjectName()); - } - } - }, new Predicate<CubingJob>() { - @Override - public boolean apply(CubingJob executable) { - try { - Output output = allOutputs.get(executable.getId()); - if (output == null) { - return false; + }).transform(new Function<AbstractExecutable, CubingJob>() { + @Override + public CubingJob apply(AbstractExecutable executable) { + return (CubingJob) executable; } + }).filter(Predicates.and(new Predicate<CubingJob>() { + @Override + public boolean apply(CubingJob executable) { + if (null == projectName || null == getProjectManager().getProject(projectName)) { + return true; + } else { + return projectName.equalsIgnoreCase(executable.getProjectName()); + } + } + }, new Predicate<CubingJob>() { + @Override + public boolean apply(CubingJob executable) { + try { + Output output = allOutputs.get(executable.getId()); + if (output == null) { + return false; + } - ExecutableState state = output.getState(); - boolean ret = statusList.contains(state); - return ret; - } catch (Exception e) { - throw e; - } - } - }, new Predicate<CubingJob>() { - @Override - public boolean apply(@Nullable CubingJob cubeJob) { - if (cubeJob == null) { - return false; - } - - if (Strings.isEmpty(jobName)) { - return true; - } - - if (nameExactMatch) { - return cubeJob.getName().equalsIgnoreCase(jobName); - } else { - return cubeJob.getName().toLowerCase().contains(jobName.toLowerCase()); - } - } - }))); + ExecutableState state = output.getState(); + boolean ret = statusList.contains(state); + return ret; + } catch (Exception e) { + throw e; + } + } + }, new Predicate<CubingJob>() { + @Override + public boolean apply(@Nullable CubingJob cubeJob) { + if (cubeJob == null) { + return false; + } + + if (Strings.isEmpty(jobName)) { + return true; + } + + if (nameExactMatch) { + return cubeJob.getName().equalsIgnoreCase(jobName); + } else { + return cubeJob.getName().toLowerCase().contains(jobName.toLowerCase()); + } + } + }))); return results; } public List<JobInstance> innerSearchCheckpointJobs(final String cubeName, final String jobName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + // TODO: use cache of jobs for this method // prepare time range Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); @@ -855,83 +863,353 @@ public JobInstance apply(CheckpointExecutable checkpointExecutable) { public List<CheckpointExecutable> innerSearchCheckpointJobs(final String cubeName, final String jobName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) { - List<CheckpointExecutable> results = Lists - .newArrayList( - FluentIterable - .from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)) - .filter(new Predicate<AbstractExecutable>() { - @Override - public boolean apply(AbstractExecutable executable) { - if (executable instanceof CheckpointExecutable) { - if (StringUtils.isEmpty(cubeName)) { - return true; - } - String executableCubeName = CubingExecutableUtil - .getCubeName(executable.getParams()); - if (executableCubeName == null) - return true; - if (nameExactMatch) - return executableCubeName.equalsIgnoreCase(cubeName); - else - return executableCubeName.toLowerCase() - .contains(cubeName.toLowerCase()); - } else { - return false; - } - } - }).transform(new Function<AbstractExecutable, CheckpointExecutable>() { - @Override - public CheckpointExecutable apply(AbstractExecutable executable) { - return (CheckpointExecutable) executable; + List<CheckpointExecutable> results = Lists.newArrayList( + FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)) + .filter(new Predicate<AbstractExecutable>() { + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CheckpointExecutable) { + if (StringUtils.isEmpty(cubeName)) { + return true; } - }).filter(Predicates.and(new Predicate<CheckpointExecutable>() { - @Override - public boolean apply(CheckpointExecutable executable) { - if (null == projectName - || null == getProjectManager().getProject(projectName)) { - return true; - } else { - return projectName.equalsIgnoreCase(executable.getProjectName()); - } + String executableCubeName = CubingExecutableUtil + .getCubeName(executable.getParams()); + if (executableCubeName == null) + return true; + if (nameExactMatch) + return executableCubeName.equalsIgnoreCase(cubeName); + else + return executableCubeName.toLowerCase().contains(cubeName.toLowerCase()); + } else { + return false; + } + } + }).transform(new Function<AbstractExecutable, CheckpointExecutable>() { + @Override + public CheckpointExecutable apply(AbstractExecutable executable) { + return (CheckpointExecutable) executable; + } + }).filter(Predicates.and(new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(CheckpointExecutable executable) { + if (null == projectName || null == getProjectManager().getProject(projectName)) { + return true; + } else { + return projectName.equalsIgnoreCase(executable.getProjectName()); + } + } + }, new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(CheckpointExecutable executable) { + try { + Output output = allOutputs.get(executable.getId()); + if (output == null) { + return false; } - }, new Predicate<CheckpointExecutable>() { - @Override - public boolean apply(CheckpointExecutable executable) { - try { - Output output = allOutputs.get(executable.getId()); - if (output == null) { - return false; - } - - ExecutableState state = output.getState(); - boolean ret = statusList.contains(state); - return ret; - } catch (Exception e) { - throw e; - } + + ExecutableState state = output.getState(); + boolean ret = statusList.contains(state); + return ret; + } catch (Exception e) { + throw e; + } + } + }, new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) { + if (checkpointExecutable == null) { + return false; + } + + if (Strings.isEmpty(jobName)) { + return true; + } + + if (nameExactMatch) { + return checkpointExecutable.getName().equalsIgnoreCase(jobName); + } else { + return checkpointExecutable.getName().toLowerCase().contains(jobName.toLowerCase()); + } + } + }))); + return results; + } + //****************************** Job search apis for Job controller V1 end ***************************************** + + //******************************** Job search apis for Job controller V2 ******************************************* + public List<JobInstance> searchJobsV2(final String cubeNameSubstring, final String projectName, + final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, + final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { + Integer limit = (null == limitValue) ? 30 : limitValue; + Integer offset = (null == offsetValue) ? 0 : offsetValue; + List<JobSearchResult> jobSearchResultList = searchJobsByCubeNameV2(cubeNameSubstring, projectName, statusList, + timeFilter, jobSearchMode); + + Collections.sort(jobSearchResultList); + + if (jobSearchResultList.size() <= offset) { + return Collections.emptyList(); + } + + // Fetch instance data of jobs for the searched job results + List<JobSearchResult> subJobSearchResultList; + if ((jobSearchResultList.size() - offset) < limit) { + subJobSearchResultList = jobSearchResultList.subList(offset, jobSearchResultList.size()); + } else { + subJobSearchResultList = jobSearchResultList.subList(offset, offset + limit); + } + + List<JobInstance> jobInstanceList = new ArrayList<>(); + for (JobSearchResult result : subJobSearchResultList) { + JobInstance jobInstance = getJobInstance(result.getId()); + jobInstanceList.add(jobInstance); + } + + return jobInstanceList; + } + + /** + * it loads all cache for digest metadata of "execute" and "execute_output", and returns the search results within the scope of the given filters + * + * @return List of search results searched by the method + * + */ + public List<JobSearchResult> searchJobsByCubeNameV2(final String cubeNameSubstring, final String projectName, + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, + final JobSearchMode jobSearchMode) { + List<JobSearchResult> result = Lists.newArrayList(); + switch (jobSearchMode) { + case ALL: + result.addAll(innerSearchCubingJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter)); + result.addAll(innerSearchCheckpointJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter)); + break; + case CHECKPOINT_ONLY: + result.addAll(innerSearchCheckpointJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter)); + break; + case CUBING_ONLY: + default: + result.addAll(innerSearchCubingJobsV2(cubeNameSubstring, null, projectName, statusList, timeFilter)); + } + return result; + } + + public List<JobSearchResult> innerSearchCubingJobsV2(final String cubeName, final String jobName, + final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + if (null == projectName) { + aclEvaluate.checkIsGlobalAdmin(); + } else { + aclEvaluate.checkProjectOperationPermission(projectName); + } + // prepare time range + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter); + long timeEndInMillis = Long.MAX_VALUE; + Set<ExecutableState> states = convertStatusEnumToStates(statusList); + final Map<String, ExecutableOutputPO> allOutputDigests = getExecutableManager() + .getAllOutputDigests(timeStartInMillis, timeEndInMillis); + return Lists + .newArrayList(FluentIterable + .from(innerSearchCubingJobsV2(cubeName, jobName, states, timeStartInMillis, timeEndInMillis, + allOutputDigests, false, projectName)) + .transform(new Function<CubingJob, JobSearchResult>() { + @Override + public JobSearchResult apply(CubingJob cubingJob) { + return JobInfoConverter.parseToJobSearchResult(cubingJob, allOutputDigests); + } + }).filter(new Predicate<JobSearchResult>() { + @Override + public boolean apply(@Nullable JobSearchResult input) { + return input != null; + } + })); + } + + public List<JobSearchResult> innerSearchCheckpointJobsV2(final String cubeName, final String jobName, + final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + if (null == projectName) { + aclEvaluate.checkIsGlobalAdmin(); + } else { + aclEvaluate.checkProjectOperationPermission(projectName); + } + // prepare time range + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter); + long timeEndInMillis = Long.MAX_VALUE; + Set<ExecutableState> states = convertStatusEnumToStates(statusList); + final Map<String, ExecutableOutputPO> allOutputDigests = getExecutableManager() + .getAllOutputDigests(timeStartInMillis, timeEndInMillis); + return Lists.newArrayList(FluentIterable + .from(innerSearchCheckpointJobsV2(cubeName, jobName, states, timeStartInMillis, timeEndInMillis, + allOutputDigests, false, projectName)) + .transform(new Function<CheckpointExecutable, JobSearchResult>() { + @Override + public JobSearchResult apply(CheckpointExecutable checkpointExecutable) { + return JobInfoConverter.parseToJobSearchResult(checkpointExecutable, allOutputDigests); + } + }).filter(new Predicate<JobSearchResult>() { + @Override + public boolean apply(@Nullable JobSearchResult input) { + return input != null; + } + })); + } + + /** + * Called by searchJobsByCubeNameV2, it loads all cache of digest metadata of "execute" and returns list of cubing job within the scope of the given filters + * + * @param allExecutableOutputPO map of executable output data with type ExecutableOutputPO + * + */ + public List<CubingJob> innerSearchCubingJobsV2(final String cubeName, final String jobName, + final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, + final Map<String, ExecutableOutputPO> allExecutableOutputPO, final boolean nameExactMatch, + final String projectName) { + List<CubingJob> results = Lists.newArrayList( + FluentIterable.from(getExecutableManager().getAllExecutableDigests(timeStartInMillis, timeEndInMillis)) + .filter(new Predicate<AbstractExecutable>() { + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CubingJob) { + if (StringUtils.isEmpty(cubeName)) { + return true; } - }, new Predicate<CheckpointExecutable>() { - @Override - public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) { - if (checkpointExecutable == null) { - return false; - } - - if (Strings.isEmpty(jobName)) { - return true; - } - - if (nameExactMatch) { - return checkpointExecutable.getName().equalsIgnoreCase(jobName); - } else { - return checkpointExecutable.getName().toLowerCase() - .contains(jobName.toLowerCase()); - } + String executableCubeName = CubingExecutableUtil + .getCubeName(executable.getParams()); + if (executableCubeName == null) + return true; + if (nameExactMatch) + return executableCubeName.equalsIgnoreCase(cubeName); + else + return executableCubeName.toLowerCase().contains(cubeName.toLowerCase()); + } else { + return false; + } + } + }).transform(new Function<AbstractExecutable, CubingJob>() { + @Override + public CubingJob apply(AbstractExecutable executable) { + return (CubingJob) executable; + } + }).filter(Predicates.and(new Predicate<CubingJob>() { + @Override + public boolean apply(CubingJob executable) { + if (null == projectName || null == getProjectManager().getProject(projectName)) { + return true; + } else { + return projectName.equalsIgnoreCase(executable.getProjectName()); + } + } + }, new Predicate<CubingJob>() { + @Override + public boolean apply(CubingJob executable) { + try { + ExecutableOutputPO executableOutputPO = allExecutableOutputPO + .get(executable.getId()); + ExecutableState state = ExecutableState.valueOf(executableOutputPO.getStatus()); + return statusList.contains(state); + + } catch (Exception e) { + throw e; + } + } + }, new Predicate<CubingJob>() { + @Override + public boolean apply(@Nullable CubingJob cubeJob) { + if (cubeJob == null) { + return false; + } + + if (Strings.isEmpty(jobName)) { + return true; + } + + if (nameExactMatch) { + return cubeJob.getName().equalsIgnoreCase(jobName); + } else { + return cubeJob.getName().toLowerCase().contains(jobName.toLowerCase()); + } + } + }))); + return results; + } + + public List<CheckpointExecutable> innerSearchCheckpointJobsV2(final String cubeName, final String jobName, + final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, + final Map<String, ExecutableOutputPO> allExecutableOutputPO, final boolean nameExactMatch, + final String projectName) { + List<CheckpointExecutable> results = Lists.newArrayList( + FluentIterable.from(getExecutableManager().getAllExecutableDigests(timeStartInMillis, timeEndInMillis)) + .filter(new Predicate<AbstractExecutable>() { + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CheckpointExecutable) { + if (StringUtils.isEmpty(cubeName)) { + return true; } - }))); + String executableCubeName = CubingExecutableUtil + .getCubeName(executable.getParams()); + if (executableCubeName == null) + return true; + if (nameExactMatch) + return executableCubeName.equalsIgnoreCase(cubeName); + else + return executableCubeName.toLowerCase().contains(cubeName.toLowerCase()); + } else { + return false; + } + } + }).transform(new Function<AbstractExecutable, CheckpointExecutable>() { + @Override + public CheckpointExecutable apply(AbstractExecutable executable) { + return (CheckpointExecutable) executable; + } + }).filter(Predicates.and(new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(CheckpointExecutable executable) { + if (null == projectName || null == getProjectManager().getProject(projectName)) { + return true; + } else { + return projectName.equalsIgnoreCase(executable.getProjectName()); + } + } + }, new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(CheckpointExecutable executable) { + try { + ExecutableOutputPO executableOutputPO = allExecutableOutputPO + .get(executable.getId()); + ExecutableState state = ExecutableState.valueOf(executableOutputPO.getStatus()); + return statusList.contains(state); + + } catch (Exception e) { + throw e; + } + } + }, new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) { + if (checkpointExecutable == null) { + return false; + } + + if (Strings.isEmpty(jobName)) { + return true; + } + + if (nameExactMatch) { + return checkpointExecutable.getName().equalsIgnoreCase(jobName); + } else { + return checkpointExecutable.getName().toLowerCase().contains(jobName.toLowerCase()); + } + } + }))); return results; } + //****************************** Job search apis for Job controller V2 end ***************************************** + public List<CubingJob> listJobsByRealizationName(final String realizationName, final String projectName, final Set<ExecutableState> statusList) { return innerSearchCubingJobs(realizationName, null, statusList, 0L, Long.MAX_VALUE, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services