Philippe created MAPREDUCE-4688: ----------------------------------- Summary: setJarByClass does not work under JBoss AS 7 Key: MAPREDUCE-4688 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4688 Project: Hadoop Map/Reduce Issue Type: Bug Affects Versions: 1.0.3, 0.20.2 Environment: Hadoop Cloudera CDH3 Cluster w/Hadoop 0.20.2 on CentOS 5/6 Client on JBoss AS 7.1.1.Final CentOS/Windows Reporter: Philippe Priority: Minor
Hello, I’m using Hadoop as a client from a J2EE web application. One of the lib within my EAR is a jar containing several Map/Reduce jobs. Using JBoss AS 4 in the past, I had no problem running the jobs with the following code: {code} try { final Configuration conf = HBaseConfiguration.create(); // Load all Hadoop configuration conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("mapred-site.xml"); conf.addResource("hbase-site.xml"); final Job job = new Job(conf, "My Job"); job.setJarByClass(MyJobClass.getClass()); TableMapReduceUtil.initTableMapperJob(...); TableMapReduceUtil.initTableReducerJob(...); final boolean status = job.waitForCompletion(true); } ... {code} Since, we have moved to JBoss AS 7 and the method setJarByClass is no longer working. Indeed, in *org.apache.hadoop.mapred.JobConf.findContainingJar(Class)* the retrieved URL does not have a *jar* protocol, but a JBoss *vfs* protocol. So, it always return null and the jar is not sent to the Map/Reduce cluster. With the VFS protocol the resource name may be or may not be the actual system file name of the resource. I mean, the class file is within the jar file which may be within an ear file in case of a non-exploded deployment, so there are not system File corresponding to the resource. Though, I guess similar issues may happen with jar: protocol. In order to make the job working with JBoss AS-7, I did the following implementation of the Job class. This override the setJarByClass mechanism, by creating a temporary jar file from the actual jar file read from vfs. {code} /** * Patch of Map/Red Job to handle VFS jar file */ public class VFSJob extends Job { /** Logger */ private static final transient Logger logger = LoggerFactory.getLogger(VFSJob.class); public VFSJob() throws IOException { super(); } public VFSJob(Configuration conf) throws IOException { super(conf); } public VFSJob(Configuration conf, String jobName) throws IOException { super(conf, jobName); } private File temporaryJarFile; /** * Patch of setJarByClass to handle VFS */ @Override public void setJarByClass(Class<?> cls) { final ClassLoader loader = cls.getClassLoader(); final String classFile = cls.getName().replaceAll("\\.", "/") + ".class"; JarInputStream is = null; JarOutputStream os = null; try { final Enumeration<URL> itr = loader.getResources(classFile); while (itr.hasMoreElements()) { final URL classUrl = itr.nextElement(); // This is the trick if (!"vfs".equals(classUrl.getProtocol())) { continue; } final String jarFile = classUrl.getFile().substring(0, classUrl.getFile().length() - (classFile.length() + 1) ); //+1 because of '/' final URL jarUrl = new URL(classUrl.getProtocol(), classUrl.getHost(), classUrl.getPort(), jarFile, new org.jboss.vfs.protocol.VirtualFileURLStreamHandler()); temporaryJarFile = File.createTempFile("mapred", ".jar"); is = (JarInputStream) jarUrl.openStream(); os = new JarOutputStream(new FileOutputStream(temporaryJarFile)); final byte[] buffer = new byte[2048]; for (JarEntry entry = is.getNextJarEntry(); entry !=null; entry = is.getNextJarEntry()) { os.putNextEntry(entry); int bytesRead; while ((bytesRead = is.read(buffer)) != -1) { os.write(buffer, 0, bytesRead); } } this.conf.setJar(temporaryJarFile.getPath()); return; } } catch (IOException e) { throw new RuntimeException(e); } finally { if (is != null) { try { is.close(); } catch (IOException e) { logger.error("Error closing input stream", e); } } if (os != null) { try { os.close(); } catch (IOException e) { logger.error("Error closing output stream", e); } } } return; } /** * Clean the temporary jar file created */ public void clean() { if (temporaryJarFile != null && temporaryJarFile.exists()) { final boolean isDeleted = temporaryJarFile.delete(); if (!isDeleted) { logger.error("Error while deleting temporary jar " + temporaryJarFile); } } } } {code} So, after the call, I must not forget deleting the temporary jar file. {code} VFSJob job = null; try { final Configuration conf = HBaseConfiguration.create(); // Load all Hadoop configuration conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("mapred-site.xml"); conf.addResource("hbase-site.xml"); job = new VFSJob(conf, "My Job"); job.setJarByClass(MyJobClass.getClass()); TableMapReduceUtil.initTableMapperJob(...); TableMapReduceUtil.initTableReducerJob(...); final boolean status = job.waitForCompletion(true); } ... } finally { job.clean(); } {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira