Author: mattmann Date: Wed Apr 22 03:50:52 2015 New Revision: 1675251 URL: http://svn.apache.org/r1675251 Log: More remainder of NUTCH-1973: Job Administration end point for the REST service contributed by Sujen Shah <sujen1...@gmail.com>
Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java?rev=1675251&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java (added) +++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java Wed Apr 22 03:50:52 2015 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.service.impl; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.nutch.service.JobManager.JobType; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.DeduplicationJob; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.crawl.Injector; +import org.apache.nutch.crawl.LinkDb; +import org.apache.nutch.fetcher.Fetcher; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.util.NutchTool; + +import com.google.common.collect.Maps; + +public class JobFactory { + private static Map<JobType, Class<? extends NutchTool>> typeToClass; + + static { + typeToClass = Maps.newHashMap(); + typeToClass.put(JobType.INJECT, Injector.class); + typeToClass.put(JobType.GENERATE, Generator.class); + typeToClass.put(JobType.FETCH, Fetcher.class); + typeToClass.put(JobType.PARSE, ParseSegment.class); + typeToClass.put(JobType.UPDATEDB, CrawlDb.class); + typeToClass.put(JobType.INVERTLINKS, LinkDb.class); + typeToClass.put(JobType.DEDUP, DeduplicationJob.class); + } + + public NutchTool createToolByType(JobType type, Configuration conf) { + if (!typeToClass.containsKey(type)) { + return null; + } + Class<? extends NutchTool> clz = typeToClass.get(type); + return createTool(clz, conf); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public NutchTool createToolByClassName(String className, Configuration conf) { + try { + Class clz = Class.forName(className); + return createTool(clz, conf); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + private NutchTool createTool(Class<? extends NutchTool> clz, + Configuration conf) { + return ReflectionUtils.newInstance(clz, conf); + } + +} \ No newline at end of file Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java?rev=1675251&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java (added) +++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java Wed Apr 22 03:50:52 2015 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.impl; + +import java.util.Collection; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.service.ConfManager; +import org.apache.nutch.service.JobManager; +import org.apache.nutch.service.model.request.JobConfig; +import org.apache.nutch.service.model.response.JobInfo; +import org.apache.nutch.service.model.response.JobInfo.State; +import org.apache.nutch.util.NutchTool; + +public class JobManagerImpl implements JobManager { + + private JobFactory jobFactory; + private NutchServerPoolExecutor executor; + private ConfManager configManager; + + public JobManagerImpl(JobFactory jobFactory, ConfManager configManager, NutchServerPoolExecutor executor) { + this.jobFactory = jobFactory; + this.configManager = configManager; + this.executor = executor; + } + + @Override + public JobInfo create(JobConfig jobConfig) { + if (jobConfig.getArgs() == null) { + throw new IllegalArgumentException("Arguments cannot be null!"); + } + Configuration conf = cloneConfiguration(jobConfig.getConfId()); + NutchTool tool = createTool(jobConfig, conf); + JobWorker worker = new JobWorker(jobConfig, conf, tool); + executor.execute(worker); + executor.purge(); + return worker.getInfo(); + } + + private Configuration cloneConfiguration(String confId) { + Configuration conf = configManager.get(confId); + if (conf == null) { + throw new IllegalArgumentException("Unknown confId " + confId); + } + return new Configuration(conf); + } + + @Override + public Collection<JobInfo> list(String crawlId, State state) { + if (state == null || state == State.ANY) { + return executor.getAllJobs(); + } + if (state == State.RUNNING || state == State.IDLE) { + return executor.getJobRunning(); + } + return executor.getJobHistory(); + } + + @Override + public JobInfo get(String crawlId, String jobId) { + return executor.getInfo(jobId); + } + + @Override + public boolean abort(String crawlId, String id) { + return executor.findWorker(id).killJob(); + } + + @Override + public boolean stop(String crawlId, String id) { + return executor.findWorker(id).stopJob(); + } + + private NutchTool createTool(JobConfig jobConfig, Configuration conf){ + if(StringUtils.isNotBlank(jobConfig.getJobClassName())){ + return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf); + } + return jobFactory.createToolByType(jobConfig.getType(), conf); + } +} Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java?rev=1675251&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java (added) +++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java Wed Apr 22 03:50:52 2015 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.impl; + +import java.text.MessageFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.service.model.request.JobConfig; +import org.apache.nutch.service.model.response.JobInfo; +import org.apache.nutch.service.model.response.JobInfo.State; +import org.apache.nutch.service.resources.ConfigResource; +import org.apache.nutch.util.NutchTool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobWorker implements Runnable{ + + private JobInfo jobInfo; + private JobConfig jobConfig; + private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class); + private NutchTool tool; + + /** + * To initialize JobWorker thread with the Job Configurations provided by user. + * @param jobConfig + * @param conf + * @param tool - NutchTool to run + */ + public JobWorker(JobConfig jobConfig, Configuration conf, NutchTool tool) { + this.jobConfig = jobConfig; + this.tool = tool; + if (jobConfig.getConfId() == null) { + jobConfig.setConfId(ConfigResource.DEFAULT); + } + + jobInfo = new JobInfo(generateId(), jobConfig, State.IDLE, "idle"); + if (jobConfig.getCrawlId() != null) { + conf.set(Nutch.CRAWL_ID_KEY, jobConfig.getCrawlId()); + } + } + + private String generateId() { + if (jobConfig.getCrawlId() == null) { + return MessageFormat.format("{0}-{1}-{2}", jobConfig.getConfId(), + jobConfig.getType(), String.valueOf(hashCode())); + } + return MessageFormat.format("{0}-{1}-{2}-{3}", jobConfig.getCrawlId(), + jobConfig.getConfId(), jobConfig.getType(), String.valueOf(hashCode())); + } + + @Override + public void run() { + try { + getInfo().setState(State.RUNNING); + getInfo().setMsg("OK"); + getInfo().setResult(tool.run(getInfo().getArgs(), getInfo().getCrawlId())); + getInfo().setState(State.FINISHED); + } catch (Exception e) { + LOG.error("Cannot run job worker!", e); + getInfo().setMsg("ERROR: " + e.toString()); + getInfo().setState(State.FAILED); + } + } + + public JobInfo getInfo() { + return jobInfo; + } + + /** + * To stop the executing job + * @return boolean true/false + */ + public boolean stopJob() { + getInfo().setState(State.STOPPING); + try { + return tool.stopJob(); + } catch (Exception e) { + throw new RuntimeException( + "Cannot stop job with id " + getInfo().getId(), e); + } + } + + public boolean killJob() { + getInfo().setState(State.KILLING); + try { + boolean result = tool.killJob(); + getInfo().setState(State.KILLED); + return result; + } catch (Exception e) { + throw new RuntimeException( + "Cannot kill job with id " + getInfo().getId(), e); + } + } + + public void setInfo(JobInfo jobInfo) { + this.jobInfo = jobInfo; + } + +} Added: nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java?rev=1675251&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java (added) +++ nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java Wed Apr 22 03:50:52 2015 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.impl; + +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.nutch.service.model.response.JobInfo; + +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; + + + +public class NutchServerPoolExecutor extends ThreadPoolExecutor{ + + private Queue<JobWorker> workersHistory; + private Queue<JobWorker> runningWorkers; + + public NutchServerPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){ + super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); + workersHistory = Queues.newArrayBlockingQueue(maxPoolSize); + runningWorkers = Queues.newArrayBlockingQueue(maxPoolSize); + } + + @Override + protected void beforeExecute(Thread thread, Runnable runnable) { + super.beforeExecute(thread, runnable); + synchronized (runningWorkers) { + runningWorkers.offer(((JobWorker) runnable)); + } + } + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + super.afterExecute(runnable, throwable); + synchronized (runningWorkers) { + runningWorkers.remove(((JobWorker) runnable).getInfo()); + } + JobWorker worker = ((JobWorker) runnable); + addStatusToHistory(worker); + } + + private void addStatusToHistory(JobWorker worker) { + synchronized (workersHistory) { + if (!workersHistory.offer(worker)) { + workersHistory.poll(); + workersHistory.add(worker); + } + } + } + + /** + * Find the Job Worker Thread + * @param jobId + * @return + */ + public JobWorker findWorker(String jobId) { + synchronized (runningWorkers) { + for (JobWorker worker : runningWorkers) { + if (StringUtils.equals(worker.getInfo().getId(), jobId)) { + return worker; + } + } + } + return null; + } + + /** + * Gives the Job history + * @return + */ + public Collection<JobInfo> getJobHistory() { + return getJobsInfo(workersHistory); + } + + /** + * Gives the list of currently running jobs + * @return + */ + public Collection<JobInfo> getJobRunning() { + return getJobsInfo(runningWorkers); + } + + /** + * Gives all jobs(currently running and completed) + * @return + */ + @SuppressWarnings("unchecked") + public Collection<JobInfo> getAllJobs() { + return CollectionUtils.union(getJobRunning(), getJobHistory()); + } + + private Collection<JobInfo> getJobsInfo(Collection<JobWorker> workers) { + List<JobInfo> jobsInfo = Lists.newLinkedList(); + for (JobWorker worker : workers) { + jobsInfo.add(worker.getInfo()); + } + return jobsInfo; + } + + + public JobInfo getInfo(String jobId) { + for (JobInfo jobInfo : getAllJobs()) { + if (StringUtils.equals(jobId, jobInfo.getId())) { + return jobInfo; + } + } + return null; + } + +} Added: nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java?rev=1675251&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java (added) +++ nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java Wed Apr 22 03:50:52 2015 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.model.request; + +import java.util.Map; + +public class DbQuery { + + private String confId; + private String type; + private Map<String, String> args; + private String crawlId; + + public String getConfId() { + return confId; + } + public void setConfId(String confId) { + this.confId = confId; + } + public Map<String, String> getArgs() { + return args; + } + public void setArgs(Map<String, String> args) { + this.args = args; + } + public String getType() { + return type; + } + public void setType(String type) { + this.type = type; + } + public String getCrawlId() { + return crawlId; + } + public void setCrawlId(String crawlId) { + this.crawlId = crawlId; + } + + + +} Added: nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java?rev=1675251&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java (added) +++ nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java Wed Apr 22 03:50:52 2015 @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.resources; + + +import java.util.Map; + +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.crawl.CrawlDbReader; +import org.apache.nutch.service.model.request.DbQuery; + +@Path(value = "/db") +public class DbResource extends AbstractResource { + + @POST + @Path(value = "/crawldb") + @Consumes(MediaType.APPLICATION_JSON) + public Object readdb(DbQuery dbQuery){ + Configuration conf = configManager.get(dbQuery.getConfId()); + String type = dbQuery.getType(); + + if(type.equalsIgnoreCase("stats")){ + return crawlDbStats(conf, dbQuery.getArgs(), dbQuery.getCrawlId()); + } + if(type.equalsIgnoreCase("dump")){ + return crawlDbDump(conf, dbQuery.getArgs(), dbQuery.getCrawlId()); + } + if(type.equalsIgnoreCase("topN")){ + return crawlDbTopN(conf, dbQuery.getArgs(), dbQuery.getCrawlId()); + } + if(type.equalsIgnoreCase("url")){ + return crawlDbUrl(conf, dbQuery.getArgs(), dbQuery.getCrawlId()); + } + return null; + + } + + @SuppressWarnings("resource") + private Response crawlDbStats(Configuration conf, Map<String, String> args, String crawlId){ + CrawlDbReader dbr = new CrawlDbReader(); + try{ + return Response.ok(dbr.query(args, conf, "stats", crawlId)).build(); + }catch(Exception e){ + e.printStackTrace(); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + @Produces(MediaType.APPLICATION_OCTET_STREAM) + private Response crawlDbDump(Configuration conf, Map<String, String> args, String crawlId){ + CrawlDbReader dbr = new CrawlDbReader(); + try{ + return Response.ok(dbr.query(args, conf, "dump", crawlId), MediaType.APPLICATION_OCTET_STREAM).build(); + }catch(Exception e){ + e.printStackTrace(); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + @Produces(MediaType.APPLICATION_OCTET_STREAM) + private Response crawlDbTopN(Configuration conf, Map<String, String> args, String crawlId) { + CrawlDbReader dbr = new CrawlDbReader(); + try{ + return Response.ok(dbr.query(args, conf, "topN", crawlId), MediaType.APPLICATION_OCTET_STREAM).build(); + }catch(Exception e){ + e.printStackTrace(); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + private Response crawlDbUrl(Configuration conf, Map<String, String> args, String crawlId){ + CrawlDbReader dbr = new CrawlDbReader(); + try{ + return Response.ok(dbr.query(args, conf, "url", crawlId)).build(); + }catch(Exception e){ + e.printStackTrace(); + return Response.serverError().entity(e.getMessage()).build(); + } + } +}