Philippe created MAPREDUCE-4688:
-----------------------------------

             Summary: setJarByClass does not work under JBoss AS 7
                 Key: MAPREDUCE-4688
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4688
             Project: Hadoop Map/Reduce
          Issue Type: Bug
    Affects Versions: 1.0.3, 0.20.2
         Environment: Hadoop Cloudera CDH3 Cluster w/Hadoop 0.20.2 on CentOS 5/6
Client on JBoss AS 7.1.1.Final CentOS/Windows
            Reporter: Philippe
            Priority: Minor


Hello,

I’m using Hadoop as a client from a J2EE web application. One of the lib within 
my EAR is a jar containing several Map/Reduce jobs. Using JBoss AS 4 in the 
past, I had no problem running the jobs with the following code:
{code}
try {
    final Configuration conf = HBaseConfiguration.create();
    // Load all Hadoop configuration
    conf.addResource("core-site.xml");
    conf.addResource("hdfs-site.xml");
    conf.addResource("mapred-site.xml");
    conf.addResource("hbase-site.xml");

    final Job job = new Job(conf, "My Job");
    job.setJarByClass(MyJobClass.getClass());
                        
                        
    TableMapReduceUtil.initTableMapperJob(...);
                        
    TableMapReduceUtil.initTableReducerJob(...);
                        
    final boolean status = job.waitForCompletion(true);
} ...
{code}

Since, we have moved to JBoss AS 7 and the method setJarByClass is no longer 
working. Indeed, in *org.apache.hadoop.mapred.JobConf.findContainingJar(Class)* 
the retrieved URL does not have a *jar* protocol, but a JBoss *vfs* protocol. 
So, it always return null and the jar is not sent to the Map/Reduce cluster.
With the VFS protocol the resource name may be or may not be the actual system 
file name of the resource. I mean, the class file is within the jar file which 
may be within an ear file in case of a non-exploded deployment, so there are 
not system File corresponding to the resource. Though, I guess similar issues 
may happen with jar: protocol.

In order to make the job working with JBoss AS-7, I did the following 
implementation of the Job class. This override the setJarByClass mechanism, by 
creating a temporary jar file from the actual jar file read from vfs.

{code}
/**
 * Patch of Map/Red Job to handle VFS jar file
 */
public class VFSJob extends Job
{
        
    /** Logger */
    private static final transient Logger logger = 
LoggerFactory.getLogger(VFSJob.class);

    public VFSJob() throws IOException {
        super();
    }

    public VFSJob(Configuration conf) throws IOException {
        super(conf);
    }

    public VFSJob(Configuration conf, String jobName) throws IOException {
        super(conf, jobName);
    }
        
    private File temporaryJarFile;

    /**
     * Patch of setJarByClass to handle VFS
     */
    @Override
    public void setJarByClass(Class<?> cls) {
       final ClassLoader loader = cls.getClassLoader();
        final String classFile = cls.getName().replaceAll("\\.", "/") + 
".class";
        JarInputStream is = null;
        JarOutputStream os = null;
        try {
            final Enumeration<URL> itr = loader.getResources(classFile);
            while (itr.hasMoreElements()) {
                final URL classUrl = itr.nextElement();
                // This is the trick
                if (!"vfs".equals(classUrl.getProtocol())) {
                    continue;
                }
                           
                final String jarFile = classUrl.getFile().substring(0, 
classUrl.getFile().length() - (classFile.length() + 1) ); //+1 because of '/'
                final URL jarUrl = new URL(classUrl.getProtocol(), 
classUrl.getHost(), classUrl.getPort(), jarFile, new 
org.jboss.vfs.protocol.VirtualFileURLStreamHandler());
                          
                temporaryJarFile = File.createTempFile("mapred", ".jar");

                is = (JarInputStream) jarUrl.openStream();
                os = new JarOutputStream(new 
FileOutputStream(temporaryJarFile));

                final byte[] buffer = new byte[2048];

                for (JarEntry entry = is.getNextJarEntry(); entry !=null; entry 
= is.getNextJarEntry()) {
                    os.putNextEntry(entry);
                    int bytesRead;
                    while ((bytesRead = is.read(buffer)) != -1) {
                        os.write(buffer, 0, bytesRead);
                    }
                }

                this.conf.setJar(temporaryJarFile.getPath());
                return;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) {
                    logger.error("Error closing input stream", e);
                }
            }
            if (os != null) {
                try {
                    os.close();
                } catch (IOException e) {
                    logger.error("Error closing output stream", e);
                }
            }
        }
                
        return;
    }
        
    /**
     * Clean the temporary jar file created
     */
    public void clean() {
        if (temporaryJarFile != null && temporaryJarFile.exists()) {
            final boolean isDeleted = temporaryJarFile.delete();
            if (!isDeleted) {
                logger.error("Error while deleting temporary jar " + 
temporaryJarFile);
            }
        }
    }


}
{code}

So, after the call, I must not forget deleting the temporary jar file.
{code}
VFSJob job = null;
try {
    final Configuration conf = HBaseConfiguration.create();
    // Load all Hadoop configuration
    conf.addResource("core-site.xml");
    conf.addResource("hdfs-site.xml");
    conf.addResource("mapred-site.xml");
    conf.addResource("hbase-site.xml");

    job = new VFSJob(conf, "My Job");
    job.setJarByClass(MyJobClass.getClass());
                        
                        
    TableMapReduceUtil.initTableMapperJob(...);
                        
    TableMapReduceUtil.initTableReducerJob(...);
                        
    final boolean status = job.waitForCompletion(true);
} ...
} finally {
    job.clean();
}
{code}








--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to